diff --git a/ntex-bytes/CHANGELOG.md b/ntex-bytes/CHANGELOG.md index 2f6c0b89..22633698 100644 --- a/ntex-bytes/CHANGELOG.md +++ b/ntex-bytes/CHANGELOG.md @@ -1,5 +1,13 @@ # Changes +## 0.1.5 (2021-12-02) + +* Split,freeze,truncate operations produce inline Bytes object if possible + +* Refactor Vec representation + +* Introduce memory pools + ## 0.1.4 (2021-06-27) * Reduce size of Option by using NonNull diff --git a/ntex-bytes/Cargo.toml b/ntex-bytes/Cargo.toml index 347d39b8..e1b43582 100644 --- a/ntex-bytes/Cargo.toml +++ b/ntex-bytes/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "ntex-bytes" -version = "0.1.4" +version = "0.1.5" license = "MIT" -authors = ["Carl Lerche "] +authors = ["Nikolay Kim ", "Carl Lerche "] description = "Types and traits for working with bytes (bytes crate fork)" documentation = "https://docs.rs/ntex-bytes" repository = "https://github.com/ntex-rs/ntex-bytes" @@ -12,9 +12,12 @@ categories = ["network-programming", "data-structures"] edition = "2018" [dependencies] -serde = "1.0" -bytes = "1.0.1" +bitflags = "1.3" +bytes = "1.0.0" +serde = "1.0.0" +futures-core = { version = "0.3.18", default-features = false, features = ["alloc"] } [dev-dependencies] serde_test = "1.0" serde_json = "1.0" +ntex = "0.4.10" diff --git a/ntex-bytes/src/bytes.rs b/ntex-bytes/src/bytes.rs index 3630328b..0d3ea7ae 100644 --- a/ntex-bytes/src/bytes.rs +++ b/ntex-bytes/src/bytes.rs @@ -1,10 +1,11 @@ use std::borrow::{Borrow, BorrowMut}; use std::iter::{FromIterator, Iterator}; use std::ops::{Deref, DerefMut, RangeBounds}; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; -use std::sync::atomic::{self, AtomicPtr, AtomicUsize}; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; +use std::sync::atomic::{self, AtomicUsize}; use std::{cmp, fmt, hash, mem, ptr, ptr::NonNull, slice, usize}; +use crate::pool::{AsPoolRef, PoolId, PoolRef}; use crate::{buf::IntoIter, buf::UninitSlice, debug, Buf, BufMut}; /// A reference counted contiguous slice of memory. @@ -92,11 +93,11 @@ use crate::{buf::IntoIter, buf::UninitSlice, debug, Buf, BufMut}; /// /// # Inline bytes /// -/// As an optimization, when the slice referenced by a `Bytes` or `BytesMut` -/// handle is small enough [^1], `with_capacity` will avoid the allocation -/// by inlining the slice directly in the handle. In this case, a clone is no -/// longer "shallow" and the data will be copied. Converting from a `Vec` will -/// never use inlining. +/// As an optimization, when the slice referenced by a `Bytes` handle is small +/// enough [^1]. In this case, a clone is no longer "shallow" and the data will +/// be copied. Converting from a `Vec` will never use inlining. `BytesMut` does +/// not support data inlining and always allocates, but during converion to `Bytes` +/// data from `BytesMut` could be inlined. /// /// [^1]: Small enough: 31 bytes on 64 bit systems, 15 on 32 bit systems. /// @@ -225,7 +226,7 @@ pub struct BytesMut { // * `ptr: *mut u8` // * `len: usize` // * `cap: usize` -// * `arc: AtomicPtr` +// * `arc: *mut Shared` // // ## `ptr: *mut u8` // @@ -251,7 +252,7 @@ pub struct BytesMut { // // When in "inlined" mode, `cap` is used as part of the inlined buffer. // -// ## `arc: AtomicPtr` +// ## `arc: *mut Shared` // // When `Inner` is in allocated mode (backed by Vec or Arc>), this // will be the pointer to the `Arc` structure tracking the ref count for the @@ -324,8 +325,14 @@ struct Inner { // other shenanigans to make it work. struct Shared { vec: Vec, - original_capacity_repr: usize, ref_count: AtomicUsize, + pool: PoolRef, +} + +struct SharedVec { + cap: usize, + ref_count: AtomicUsize, + pool: PoolRef, } // Buffer storage strategy flags. @@ -334,25 +341,10 @@ const KIND_INLINE: usize = 0b01; const KIND_STATIC: usize = 0b10; const KIND_VEC: usize = 0b11; const KIND_MASK: usize = 0b11; +const KIND_UNMASK: usize = !KIND_MASK; -// The max original capacity value. Any `Bytes` allocated with a greater initial -// capacity will default to this. -const MAX_ORIGINAL_CAPACITY_WIDTH: usize = 17; -// The original capacity algorithm will not take effect unless the originally -// allocated capacity was at least 1kb in size. -const MIN_ORIGINAL_CAPACITY_WIDTH: usize = 10; -// The original capacity is stored in powers of 2 starting at 1kb to a max of -// 64kb. Representing it as such requires only 3 bits of storage. -const ORIGINAL_CAPACITY_MASK: usize = 0b11100; -const ORIGINAL_CAPACITY_OFFSET: usize = 2; - -// When the storage is in the `Vec` representation, the pointer can be advanced -// at most this value. This is due to the amount of storage available to track -// the offset is usize - number of KIND bits and number of ORIGINAL_CAPACITY -// bits. -const VEC_POS_OFFSET: usize = 5; -const MAX_VEC_POS: usize = usize::MAX >> VEC_POS_OFFSET; -const NOT_VEC_POS_MASK: usize = 0b11111; +const MIN_NON_ZERO_CAP: usize = 64; +const SHARED_VEC_SIZE: usize = mem::size_of::(); // Bit op constants for extracting the inline length value from the `arc` field. const INLINE_LEN_MASK: usize = 0b1111_1100; @@ -363,21 +355,18 @@ const INLINE_LEN_OFFSET: usize = 2; // storage flag, so the data is shifted by a byte. On big endian systems, the // data starts at the beginning of the struct. #[cfg(target_endian = "little")] -const INLINE_DATA_OFFSET: isize = 1; +const INLINE_DATA_OFFSET: isize = 2; #[cfg(target_endian = "big")] const INLINE_DATA_OFFSET: isize = 0; -#[cfg(target_pointer_width = "64")] -const PTR_WIDTH: usize = 64; -#[cfg(target_pointer_width = "32")] -const PTR_WIDTH: usize = 32; - // Inline buffer capacity. This is the size of `Inner` minus 1 byte for the // metadata. #[cfg(target_pointer_width = "64")] -const INLINE_CAP: usize = 4 * 8 - 1; +const INLINE_CAP: usize = 4 * 8 - 2; #[cfg(target_pointer_width = "32")] -const INLINE_CAP: usize = 4 * 4 - 1; +const INLINE_CAP: usize = 4 * 4 - 2; + +const EMPTY: &[u8] = &[]; /* * @@ -386,36 +375,6 @@ const INLINE_CAP: usize = 4 * 4 - 1; */ impl Bytes { - /// Creates a new `Bytes` with the specified capacity. - /// - /// The returned `Bytes` will be able to hold at least `capacity` bytes - /// without reallocating. If `capacity` is under `4 * size_of::() - 1`, - /// then `BytesMut` will not allocate. - /// - /// It is important to note that this function does not specify the length - /// of the returned `Bytes`, but only the capacity. - /// - /// # Examples - /// - /// ``` - /// use ntex_bytes::Bytes; - /// - /// let mut bytes = Bytes::with_capacity(64); - /// - /// // `bytes` contains no data, even though there is capacity - /// assert_eq!(bytes.len(), 0); - /// - /// bytes.extend_from_slice(&b"hello world"[..]); - /// - /// assert_eq!(&bytes[..], b"hello world"); - /// ``` - #[inline] - pub fn with_capacity(capacity: usize) -> Bytes { - Bytes { - inner: Inner::with_capacity(capacity), - } - } - /// Creates a new empty `Bytes`. /// /// This will not allocate and the returned `Bytes` handle will be empty. @@ -430,10 +389,9 @@ impl Bytes { /// ``` #[inline] pub const fn new() -> Bytes { - // Make it a named const to work around - // "unsizing casts are not allowed in const fn" - const EMPTY: &[u8] = &[]; - Bytes::from_static(EMPTY) + Bytes { + inner: Inner::empty(), + } } /// Creates a new `Bytes` from a static slice. @@ -490,11 +448,11 @@ impl Bytes { /// /// # Examples /// ``` - /// use ntex_bytes::Bytes; + /// use ntex_bytes::{Bytes, BytesMut}; /// - /// assert!(Bytes::with_capacity(4).is_inline()); - /// assert!(!Bytes::from(Vec::with_capacity(4)).is_inline()); - /// assert!(!Bytes::with_capacity(1024).is_inline()); + /// assert!(Bytes::from(BytesMut::from(&[0, 0, 0, 0][..])).is_inline()); + /// assert!(Bytes::from(Vec::with_capacity(4)).is_inline()); + /// assert!(!Bytes::from(&[0; 1024][..]).is_inline()); /// ``` pub fn is_inline(&self) -> bool { self.inner.is_inline() @@ -502,7 +460,29 @@ impl Bytes { /// Creates `Bytes` instance from slice, by copying it. pub fn copy_from_slice(data: &[u8]) -> Self { - BytesMut::from(data).freeze() + if data.len() <= INLINE_CAP { + Bytes { + inner: Inner::from_slice_inline(data), + } + } else { + Bytes { + inner: BytesMut::copy_from_slice_in(data, PoolId::DEFAULT.pool_ref()) + .inner, + } + } + } + + /// Creates `Bytes` instance from slice, by copying it. + pub fn copy_from_slice_in(data: &[u8], pool: T) -> Self { + if data.len() <= INLINE_CAP { + Bytes { + inner: Inner::from_slice_inline(data), + } + } else { + Bytes { + inner: BytesMut::copy_from_slice_in(data, pool.pool_ref()).inner, + } + } } /// Returns a slice of self for the provided range. @@ -548,7 +528,9 @@ impl Bytes { assert!(end <= len); if end - begin <= INLINE_CAP { - return Bytes::copy_from_slice(&self[begin..end]); + return Bytes { + inner: Inner::from_slice_inline(&self[begin..end]), + }; } let mut ret = self.clone(); @@ -636,7 +618,7 @@ impl Bytes { } Bytes { - inner: self.inner.split_off(at), + inner: self.inner.split_off(at, true), } } @@ -675,7 +657,7 @@ impl Bytes { } Bytes { - inner: self.inner.split_to(at), + inner: self.inner.split_to(at, true), } } @@ -701,7 +683,7 @@ impl Bytes { /// [`split_off`]: #method.split_off #[inline] pub fn truncate(&mut self, len: usize) { - self.inner.truncate(len); + self.inner.truncate(len, true); } /// Shortens the buffer to `len` bytes and dropping the rest. @@ -722,12 +704,20 @@ impl Bytes { let kind = self.inner.kind(); // trim down only if buffer is not inline or static and - // buffer cap is greater than 64 bytes + // buffer's unused space is greater than 64 bytes if !(kind == KIND_INLINE || kind == KIND_STATIC) && (self.inner.capacity() - self.inner.len() >= 64) { - let bytes = Bytes::copy_from_slice(self); - let _ = mem::replace(self, bytes); + *self = if self.len() <= INLINE_CAP { + Bytes { + inner: Inner::from_slice_inline(self), + } + } else { + Bytes { + inner: BytesMut::copy_from_slice_in_priv(self, self.inner.pool()) + .inner, + } + }; } } @@ -744,7 +734,7 @@ impl Bytes { /// ``` #[inline] pub fn clear(&mut self) { - self.truncate(0); + self.inner = Inner::from_static(EMPTY); } /// Attempts to convert into a `BytesMut` handle. @@ -783,92 +773,6 @@ impl Bytes { } } - /// Acquires a mutable reference to the owned form of the data. - /// - /// Clones the data if it is not already owned. - pub fn to_mut(&mut self) -> &mut BytesMut { - if !self.inner.is_mut_safe() { - let new = Self::copy_from_slice(&self[..]); - *self = new; - } - unsafe { &mut *(self as *mut Bytes as *mut BytesMut) } - } - - /// Appends given bytes to this object. - /// - /// If this `Bytes` object has not enough capacity, it is resized first. - /// If it is shared (`refcount > 1`), it is copied first. - /// - /// This operation can be less effective than the similar operation on - /// `BytesMut`, especially on small additions. - /// - /// # Examples - /// - /// ``` - /// use ntex_bytes::Bytes; - /// - /// let mut buf = Bytes::from("aabb"); - /// buf.extend_from_slice(b"ccdd"); - /// buf.extend_from_slice(b"eeff"); - /// - /// assert_eq!(b"aabbccddeeff", &buf[..]); - /// ``` - pub fn extend_from_slice(&mut self, extend: &[u8]) { - if extend.is_empty() { - return; - } - - let new_cap = self - .len() - .checked_add(extend.len()) - .expect("capacity overflow"); - - let result = match mem::replace(self, Bytes::new()).try_mut() { - Ok(mut bytes_mut) => { - bytes_mut.extend_from_slice(extend); - bytes_mut - } - Err(bytes) => { - let mut bytes_mut = BytesMut::with_capacity(new_cap); - bytes_mut.put_slice(&bytes); - bytes_mut.put_slice(extend); - bytes_mut - } - }; - - let _ = mem::replace(self, result.freeze()); - } - - /// Combine splitted Bytes objects back as contiguous. - /// - /// If `Bytes` objects were not contiguous originally, they will be extended. - /// - /// # Examples - /// - /// ``` - /// use ntex_bytes::Bytes; - /// - /// let mut buf = Bytes::with_capacity(64); - /// buf.extend_from_slice(b"aaabbbcccddd"); - /// - /// let splitted = buf.split_off(6); - /// assert_eq!(b"aaabbb", &buf[..]); - /// assert_eq!(b"cccddd", &splitted[..]); - /// - /// buf.unsplit(splitted); - /// assert_eq!(b"aaabbbcccddd", &buf[..]); - /// ``` - pub fn unsplit(&mut self, other: Bytes) { - if self.is_empty() { - *self = other; - return; - } - - if let Err(other_inner) = self.inner.try_unsplit(other.inner) { - self.extend_from_slice(other_inner.as_ref()); - } - } - /// Returns an iterator over the bytes contained by the buffer. /// /// # Examples @@ -938,7 +842,7 @@ impl bytes::buf::Buf for Bytes { impl Clone for Bytes { fn clone(&self) -> Bytes { Bytes { - inner: unsafe { self.inner.shallow_clone(false) }, + inner: unsafe { self.inner.shallow_clone() }, } } } @@ -974,6 +878,10 @@ impl From> for Bytes { fn from(src: Vec) -> Bytes { if src.is_empty() { Bytes::new() + } else if src.len() <= INLINE_CAP { + Bytes { + inner: Inner::from_slice_inline(&src), + } } else { BytesMut::from(src).freeze() } @@ -982,7 +890,15 @@ impl From> for Bytes { impl From for Bytes { fn from(src: String) -> Bytes { - BytesMut::from(src).freeze() + if src.is_empty() { + Bytes::new() + } else if src.bytes().len() <= INLINE_CAP { + Bytes { + inner: Inner::from_slice_inline(src.as_bytes()), + } + } else { + BytesMut::from(src).freeze() + } } } @@ -1004,7 +920,6 @@ impl FromIterator for BytesMut { let (min, maybe_max) = iter.size_hint(); let mut out = BytesMut::with_capacity(maybe_max.unwrap_or(min)); - for i in iter { out.reserve(1); out.put_u8(i); @@ -1099,44 +1014,6 @@ impl<'a> IntoIterator for &'a Bytes { } } -impl Extend for Bytes { - fn extend(&mut self, iter: T) - where - T: IntoIterator, - { - let iter = iter.into_iter(); - - let (lower, upper) = iter.size_hint(); - - // Avoid possible conversion into mut if there's nothing to add - if let Some(0) = upper { - return; - } - - let mut bytes_mut = match mem::replace(self, Bytes::new()).try_mut() { - Ok(bytes_mut) => bytes_mut, - Err(bytes) => { - let mut bytes_mut = BytesMut::with_capacity(bytes.len() + lower); - bytes_mut.put_slice(&bytes); - bytes_mut - } - }; - - bytes_mut.extend(iter); - - let _ = mem::replace(self, bytes_mut.freeze()); - } -} - -impl<'a> Extend<&'a u8> for Bytes { - fn extend(&mut self, iter: T) - where - T: IntoIterator, - { - self.extend(iter.into_iter().copied()) - } -} - /* * * ===== BytesMut ===== @@ -1153,6 +1030,11 @@ impl BytesMut { /// It is important to note that this function does not specify the length /// of the returned `BytesMut`, but only the capacity. /// + /// # Panics + /// + /// Panics if `capacity` greater than 60bit for 64bit systems + /// and 28bit for 32bit systems + /// /// # Examples /// /// ``` @@ -1169,8 +1051,58 @@ impl BytesMut { /// ``` #[inline] pub fn with_capacity(capacity: usize) -> BytesMut { + Self::with_capacity_in(capacity, PoolId::DEFAULT.pool_ref()) + } + + /// Creates a new `BytesMut` with the specified capacity and in specified memory pool. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::{BytesMut, BufMut, PoolId}; + /// + /// let mut bytes = BytesMut::with_capacity_in(64, PoolId::P1); + /// + /// // `bytes` contains no data, even though there is capacity + /// assert_eq!(bytes.len(), 0); + /// + /// bytes.put(&b"hello world"[..]); + /// + /// assert_eq!(&bytes[..], b"hello world"); + /// assert!(PoolId::P1.pool_ref().allocated() > 0); + /// ``` + #[inline] + pub fn with_capacity_in(capacity: usize, pool: T) -> BytesMut { BytesMut { - inner: Inner::with_capacity(capacity), + inner: Inner::with_capacity(capacity, pool.pool_ref()), + } + } + + #[inline] + pub(crate) fn with_capacity_in_priv(capacity: usize, pool: PoolRef) -> BytesMut { + BytesMut { + inner: Inner::with_capacity(capacity, pool), + } + } + + /// Creates a new `BytesMut` from slice, by copying it. + pub fn copy_from_slice_in(src: &[u8], pool: T) -> Self { + let mut bytes = BytesMut::with_capacity_in(src.len(), pool.pool_ref()); + bytes.extend_from_slice(src); + bytes + } + + fn copy_from_slice_in_priv(src: &[u8], pool: PoolRef) -> Self { + let mut bytes = BytesMut::with_capacity_in_priv(src.len(), pool); + bytes.extend_from_slice(src); + bytes + } + + #[inline] + /// Convert a `Vec` into a `BytesMut` + pub fn from_vec(src: Vec, pool: T) -> BytesMut { + BytesMut { + inner: Inner::from_vec(src, pool.pool_ref()), } } @@ -1195,7 +1127,7 @@ impl BytesMut { /// ``` #[inline] pub fn new() -> BytesMut { - BytesMut::with_capacity(0) + BytesMut::with_capacity(MIN_NON_ZERO_CAP) } /// Returns the number of bytes contained in this `BytesMut`. @@ -1228,20 +1160,6 @@ impl BytesMut { self.inner.is_empty() } - /// Return true if the `BytesMut` uses inline allocation - /// - /// # Examples - /// ``` - /// use ntex_bytes::BytesMut; - /// - /// assert!(BytesMut::with_capacity(4).is_inline()); - /// assert!(!BytesMut::from(Vec::with_capacity(4)).is_inline()); - /// assert!(!BytesMut::with_capacity(1024).is_inline()); - /// ``` - pub fn is_inline(&self) -> bool { - self.inner.is_inline() - } - /// Returns the number of bytes the `BytesMut` can hold without reallocating. /// /// # Examples @@ -1283,7 +1201,13 @@ impl BytesMut { /// ``` #[inline] pub fn freeze(self) -> Bytes { - Bytes { inner: self.inner } + if self.inner.len() <= INLINE_CAP { + Bytes { + inner: self.inner.to_inline(), + } + } else { + Bytes { inner: self.inner } + } } /// Splits the bytes into two at the given index. @@ -1314,7 +1238,7 @@ impl BytesMut { /// Panics if `at > capacity`. pub fn split_off(&mut self, at: usize) -> BytesMut { BytesMut { - inner: self.inner.split_off(at), + inner: self.inner.split_off(at, false), } } @@ -1378,7 +1302,7 @@ impl BytesMut { assert!(at <= self.len()); BytesMut { - inner: self.inner.split_to(at), + inner: self.inner.split_to(at, false), } } @@ -1403,7 +1327,7 @@ impl BytesMut { /// /// [`split_off`]: #method.split_off pub fn truncate(&mut self, len: usize) { - self.inner.truncate(len); + self.inner.truncate(len, false); } /// Clears the buffer, removing all data. @@ -1427,6 +1351,11 @@ impl BytesMut { /// difference with each additional byte set to `value`. If `new_len` is /// less than `len`, the buffer is simply truncated. /// + /// # Panics + /// + /// Panics if `new_len` greater than 60bit for 64bit systems + /// and 28bit for 32bit systems + /// /// # Examples /// /// ``` @@ -1497,6 +1426,11 @@ impl BytesMut { /// buffer's capacity, then the current view will be copied to the front of /// the buffer and the handle will take ownership of the full buffer. /// + /// # Panics + /// + /// Panics if new capacity is greater than 60bit for 64bit systems + /// and 28bit for 32bit systems + /// /// # Examples /// /// In the following example, a new buffer is allocated. @@ -1544,7 +1478,7 @@ impl BytesMut { return; } - self.inner.reserve_inner(additional) + self.inner.reserve_inner(additional); } /// Appends given bytes to this object. @@ -1568,36 +1502,6 @@ impl BytesMut { self.put_slice(extend); } - /// Combine splitted BytesMut objects back as contiguous. - /// - /// If `BytesMut` objects were not contiguous originally, they will be extended. - /// - /// # Examples - /// - /// ``` - /// use ntex_bytes::BytesMut; - /// - /// let mut buf = BytesMut::with_capacity(64); - /// buf.extend_from_slice(b"aaabbbcccddd"); - /// - /// let splitted = buf.split_off(6); - /// assert_eq!(b"aaabbb", &buf[..]); - /// assert_eq!(b"cccddd", &splitted[..]); - /// - /// buf.unsplit(splitted); - /// assert_eq!(b"aaabbbcccddd", &buf[..]); - /// ``` - pub fn unsplit(&mut self, other: BytesMut) { - if self.is_empty() { - *self = other; - return; - } - - if let Err(other_inner) = self.inner.try_unsplit(other.inner) { - self.extend_from_slice(other_inner.as_ref()); - } - } - /// Returns an iterator over the bytes contained by the buffer. /// /// # Examples @@ -1617,6 +1521,10 @@ impl BytesMut { pub fn iter(&'_ self) -> std::slice::Iter<'_, u8> { self.chunk().iter() } + + pub(crate) fn move_to_pool(&mut self, pool: PoolRef) { + self.inner.move_to_pool(pool); + } } impl Buf for BytesMut { @@ -1727,23 +1635,21 @@ impl DerefMut for BytesMut { } impl From> for BytesMut { + #[inline] /// Convert a `Vec` into a `BytesMut` /// /// This constructor may be used to avoid the inlining optimization used by /// `with_capacity`. A `BytesMut` constructed this way will always store /// its data on the heap. - #[inline] fn from(src: Vec) -> BytesMut { - BytesMut { - inner: Inner::from_vec(src), - } + BytesMut::from_vec(src, PoolId::DEFAULT.pool_ref()) } } impl From for BytesMut { #[inline] fn from(src: String) -> BytesMut { - BytesMut::from(src.into_bytes()) + BytesMut::from_vec(src.into_bytes(), PoolId::DEFAULT.pool_ref()) } } @@ -1753,20 +1659,8 @@ impl<'a> From<&'a [u8]> for BytesMut { if len == 0 { BytesMut::new() - } else if len <= INLINE_CAP { - unsafe { - #[allow(invalid_value, clippy::uninit_assumed_init)] - let mut inner: Inner = mem::MaybeUninit::uninit().assume_init(); - - // Set inline mask - inner.arc = NonNull::new_unchecked(KIND_INLINE as *mut Shared); - inner.set_inline_len(len); - inner.as_raw()[0..len].copy_from_slice(src); - - BytesMut { inner } - } } else { - BytesMut::from(src.to_vec()) + BytesMut::copy_from_slice_in(src, PoolId::DEFAULT.pool_ref()) } } } @@ -1781,7 +1675,9 @@ impl<'a> From<&'a str> for BytesMut { impl From for BytesMut { #[inline] fn from(src: Bytes) -> BytesMut { - src.try_mut().unwrap_or_else(|src| BytesMut::from(&src[..])) + src.try_mut().unwrap_or_else(|src| { + BytesMut::copy_from_slice_in_priv(&src[..], src.inner.pool()) + }) } } @@ -1866,7 +1762,9 @@ impl fmt::Write for BytesMut { impl Clone for BytesMut { #[inline] fn clone(&self) -> BytesMut { - BytesMut::from(&self[..]) + BytesMut { + inner: unsafe { self.inner.shallow_clone() }, + } } } @@ -1936,36 +1834,133 @@ impl Inner { } #[inline] - fn from_vec(mut src: Vec) -> Inner { - let len = src.len(); - let cap = src.capacity(); - let ptr = src.as_mut_ptr(); - - mem::forget(src); - - let original_capacity_repr = original_capacity_to_repr(cap); - let arc = (original_capacity_repr << ORIGINAL_CAPACITY_OFFSET) | KIND_VEC; - + const fn empty() -> Inner { Inner { - arc: unsafe { NonNull::new_unchecked(arc as *mut Shared) }, - ptr, - len, - cap, + arc: unsafe { NonNull::new_unchecked(KIND_INLINE as *mut Shared) }, + ptr: 0 as *mut u8, + len: 0, + cap: 0, } } #[inline] - fn with_capacity(capacity: usize) -> Inner { - if capacity <= INLINE_CAP { - unsafe { - // Using uninitialized memory is ~30% faster - #[allow(invalid_value, clippy::uninit_assumed_init)] - let mut inner: Inner = mem::MaybeUninit::uninit().assume_init(); - inner.arc = NonNull::new_unchecked(KIND_INLINE as *mut Shared); - inner + fn from_vec(mut vec: Vec, pool: PoolRef) -> Inner { + let len = vec.len(); + let cap = vec.capacity(); + let ptr = vec.as_mut_ptr(); + pool.acquire(cap); + + // Store data in arc + let shared = Box::into_raw(Box::new(Shared { + vec, + pool, + ref_count: AtomicUsize::new(1), + })); + + // The pointer should be aligned, so this assert should always succeed. + debug_assert!(0 == (shared as usize & KIND_MASK)); + + // Create new arc, so atomic operations can be avoided. + Inner { + ptr, + len, + cap, + arc: unsafe { NonNull::new_unchecked(shared) }, + } + } + + #[inline] + fn with_capacity(capacity: usize, pool: PoolRef) -> Inner { + Inner::from_slice(capacity, &[], pool) + } + + #[inline] + fn from_slice(cap: usize, src: &[u8], pool: PoolRef) -> Inner { + // Store data in vec + let mut vec = Vec::with_capacity(cap + SHARED_VEC_SIZE); + unsafe { + #![allow(clippy::uninit_vec)] + vec.set_len(SHARED_VEC_SIZE); + vec.extend_from_slice(src); + + let len = src.len(); + let full_cap = vec.capacity(); + let cap = full_cap - SHARED_VEC_SIZE; + let ptr = vec.as_mut_ptr(); + mem::forget(vec); + pool.acquire(full_cap); + + let shared_vec_ptr = ptr as *mut SharedVec; + ptr::write( + shared_vec_ptr, + SharedVec { + pool, + cap: full_cap, + ref_count: AtomicUsize::new(1), + }, + ); + + // Create new arc, so atomic operations can be avoided. + Inner { + len, + cap, + ptr: ptr.add(SHARED_VEC_SIZE), + arc: NonNull::new_unchecked((ptr as usize ^ KIND_VEC) as *mut Shared), } + } + } + + #[inline] + fn from_slice_inline(src: &[u8]) -> Inner { + unsafe { Inner::from_ptr_inline(src.as_ptr(), src.len()) } + } + + #[inline] + unsafe fn from_ptr_inline(src: *const u8, len: usize) -> Inner { + // Using uninitialized memory is ~30% faster + #[allow(invalid_value, clippy::uninit_assumed_init)] + let mut inner: Inner = mem::MaybeUninit::uninit().assume_init(); + inner.arc = NonNull::new_unchecked(KIND_INLINE as *mut Shared); + + let dst = inner.inline_ptr(); + ptr::copy(src, dst, len); + inner.set_inline_len(len); + inner + } + + #[inline] + fn pool(&self) -> PoolRef { + let kind = self.kind(); + + if kind == KIND_VEC { + unsafe { (*self.shared_vec()).pool } + } else if kind == KIND_ARC { + unsafe { (*self.arc.as_ptr()).pool } } else { - Inner::from_vec(Vec::with_capacity(capacity)) + PoolId::DEFAULT.pool_ref() + } + } + + #[inline] + fn move_to_pool(&mut self, pool: PoolRef) { + let kind = self.kind(); + + if kind == KIND_VEC { + let vec = self.shared_vec(); + unsafe { + let cap = (*vec).cap; + pool.acquire(cap); + let pool = mem::replace(&mut (*vec).pool, pool); + pool.release(cap); + } + } else if kind == KIND_ARC { + let arc = self.arc.as_ptr(); + unsafe { + let cap = (*arc).vec.capacity(); + pool.acquire(cap); + let pool = mem::replace(&mut (*arc).pool, pool); + pool.release(cap); + } } } @@ -2008,6 +2003,16 @@ impl Inner { } } + /// Return a raw pointer to data + #[inline] + unsafe fn as_ptr(&mut self) -> *mut u8 { + if self.is_inline() { + self.inline_ptr() + } else { + self.ptr + } + } + /// Insert a byte into the next slot and advance the len by 1. #[inline] fn put_u8(&mut self, n: u8) { @@ -2042,6 +2047,20 @@ impl Inner { (self as *const Inner as *mut Inner as *mut u8).offset(INLINE_DATA_OFFSET) } + #[inline] + fn to_inline(&self) -> Inner { + unsafe { + // Using uninitialized memory is ~30% faster + #[allow(invalid_value, clippy::uninit_assumed_init)] + let mut inner: Inner = mem::MaybeUninit::uninit().assume_init(); + inner.arc = NonNull::new_unchecked(KIND_INLINE as *mut Shared); + let len = self.len(); + inner.as_raw()[..len].copy_from_slice(self.as_ref()); + inner.set_inline_len(len); + inner + } + } + #[inline] fn inline_len(&self) -> usize { // This is undefind behavior due to a data race, but experimental @@ -2089,54 +2108,57 @@ impl Inner { } } - fn split_off(&mut self, at: usize) -> Inner { - let mut other = unsafe { self.shallow_clone(true) }; - + fn split_off(&mut self, at: usize, create_inline: bool) -> Inner { + let other = unsafe { + if create_inline && self.len() - at <= INLINE_CAP { + Inner::from_ptr_inline(self.as_ptr().add(at), self.len() - at) + } else { + let mut other = self.shallow_clone(); + other.set_start(at); + other + } + }; unsafe { - other.set_start(at); - self.set_end(at); - } - - other - } - - fn split_to(&mut self, at: usize) -> Inner { - let mut other = unsafe { self.shallow_clone(true) }; - - unsafe { - other.set_end(at); - self.set_start(at); - } - - other - } - - fn truncate(&mut self, len: usize) { - if len <= self.len() { - unsafe { - self.set_len(len); + if create_inline && at <= INLINE_CAP { + *self = Inner::from_ptr_inline(self.as_ptr(), at); + } else { + self.set_end(at); } } + + other } - fn try_unsplit(&mut self, other: Inner) -> Result<(), Inner> { - let ptr; - - if other.is_empty() { - return Ok(()); - } - + fn split_to(&mut self, at: usize, create_inline: bool) -> Inner { + let other = unsafe { + if create_inline && at <= INLINE_CAP { + Inner::from_ptr_inline(self.as_ptr(), at) + } else { + let mut other = self.shallow_clone(); + other.set_end(at); + other + } + }; unsafe { - ptr = self.ptr.add(self.len); + if create_inline && self.len() - at <= INLINE_CAP { + *self = Inner::from_ptr_inline(self.as_ptr().add(at), self.len() - at); + } else { + self.set_start(at); + } } - if ptr == other.ptr && self.kind() == KIND_ARC && other.kind() == KIND_ARC { - // debug_assert_eq!(self.arc.load(Acquire), other.arc.load(Acquire)); - // Contiguous blocks, just combine directly - self.len += other.len; - self.cap += other.cap; - Ok(()) - } else { - Err(other) + + other + } + + fn truncate(&mut self, len: usize, create_inline: bool) { + unsafe { + if len <= self.len() { + if create_inline && len < INLINE_CAP { + *self = Inner::from_ptr_inline(self.as_ptr(), len); + } else { + self.set_len(len); + } + } } } @@ -2151,7 +2173,7 @@ impl Inner { self.set_len(new_len); } } else { - self.truncate(new_len); + self.truncate(new_len, false); } } @@ -2170,7 +2192,6 @@ impl Inner { assert!(start <= INLINE_CAP); let len = self.inline_len(); - if len <= start { self.set_inline_len(0); } else { @@ -2190,25 +2211,6 @@ impl Inner { } else { assert!(start <= self.cap); - if kind == KIND_VEC { - // Setting the start when in vec representation is a little more - // complicated. First, we have to track how far ahead the - // "start" of the byte buffer from the beginning of the vec. We - // also have to ensure that we don't exceed the maximum shift. - let (mut pos, prev) = self.uncoordinated_get_vec_pos(); - pos += start; - - if pos <= MAX_VEC_POS { - self.uncoordinated_set_vec_pos(pos, prev); - } else { - // The repr must be upgraded to ARC. This will never happen - // on 64 bit systems and will only happen on 32 bit systems - // when shifting past 134,217,727 bytes. As such, we don't - // worry too much about performance here. - let _ = self.shallow_clone(true); - } - } - // Updating the start of the view is setting `ptr` to point to the // new start and updating the `len` field to reflect the new length // of the view. @@ -2225,8 +2227,6 @@ impl Inner { } unsafe fn set_end(&mut self, end: usize) { - debug_assert!(self.is_shared()); - // Always check `inline` first, because if the handle is using inline // data storage, all of the `Inner` struct fields will be gibberish. if self.is_inline() { @@ -2247,12 +2247,16 @@ impl Inner { // Always check `inline` first, because if the handle is using inline // data storage, all of the `Inner` struct fields will be gibberish. - if kind == KIND_INLINE || kind == KIND_VEC { + if kind == KIND_INLINE { // Inlined buffers can always be mutated as the data is never shared // across handles. true } else if kind == KIND_STATIC { false + } else if kind == KIND_VEC { + // Otherwise, the underlying buffer is potentially shared with other + // handles, so the ref_count needs to be checked. + unsafe { (*self.shared_vec()).is_unique() } } else { // Otherwise, the underlying buffer is potentially shared with other // handles, so the ref_count needs to be checked. @@ -2270,7 +2274,7 @@ impl Inner { /// the same byte window. /// /// This function is thread safe. - unsafe fn shallow_clone(&self, mut_self: bool) -> Inner { + unsafe fn shallow_clone(&self) -> Inner { // Always check `inline` first, because if the handle is using inline // data storage, all of the `Inner` struct fields will be gibberish. // @@ -2283,12 +2287,12 @@ impl Inner { ptr::copy_nonoverlapping(self, inner.as_mut_ptr(), 1); inner.assume_init() } else { - self.shallow_clone_sync(mut_self) + self.shallow_clone_sync() } } #[cold] - unsafe fn shallow_clone_sync(&self, mut_self: bool) -> Inner { + unsafe fn shallow_clone_sync(&self) -> Inner { // The function requires `&self`, this means that `shallow_clone` // could be called concurrently. // @@ -2297,111 +2301,31 @@ impl Inner { // `compare_and_swap` that comes later in this function. The goal is // to ensure that if `arc` is currently set to point to a `Shared`, // that the current thread acquires the associated memory. - let slf_arc: &AtomicPtr = mem::transmute(&self.arc); - let arc = slf_arc.load(Acquire); + let arc: *mut Shared = self.arc.as_ptr(); let kind = arc as usize & KIND_MASK; if kind == KIND_ARC { - self.shallow_clone_arc(arc) + let old_size = (*arc).ref_count.fetch_add(1, Relaxed); + if old_size == usize::MAX { + abort(); + } + + Inner { + arc: NonNull::new_unchecked(arc), + ..*self + } } else { assert!(kind == KIND_VEC); - self.shallow_clone_vec(arc as usize, mut_self) - } - } - unsafe fn shallow_clone_arc(&self, arc: *mut Shared) -> Inner { - debug_assert!(arc as usize & KIND_MASK == KIND_ARC); - - let old_size = (*arc).ref_count.fetch_add(1, Relaxed); - - if old_size == usize::MAX { - abort(); - } - - Inner { - arc: NonNull::new_unchecked(arc), - ..*self - } - } - - #[cold] - unsafe fn shallow_clone_vec(&self, arc: usize, mut_self: bool) -> Inner { - // If the buffer is still tracked in a `Vec`. It is time to - // promote the vec to an `Arc`. This could potentially be called - // concurrently, so some care must be taken. - - debug_assert!(arc & KIND_MASK == KIND_VEC); - - let original_capacity_repr = - (arc as usize & ORIGINAL_CAPACITY_MASK) >> ORIGINAL_CAPACITY_OFFSET; - - // The vec offset cannot be concurrently mutated, so there - // should be no danger reading it. - let off = (arc as usize) >> VEC_POS_OFFSET; - - // First, allocate a new `Shared` instance containing the - // `Vec` fields. It's important to note that `ptr`, `len`, - // and `cap` cannot be mutated without having `&mut self`. - // This means that these fields will not be concurrently - // updated and since the buffer hasn't been promoted to an - // `Arc`, those three fields still are the components of the - // vector. - let shared = Box::new(Shared { - vec: rebuild_vec(self.ptr, self.len, self.cap, off), - original_capacity_repr, - // Initialize refcount to 2. One for this reference, and one - // for the new clone that will be returned from - // `shallow_clone`. - ref_count: AtomicUsize::new(2), - }); - - let shared = Box::into_raw(shared); - - // The pointer should be aligned, so this assert should - // always succeed. - debug_assert!(0 == (shared as usize & KIND_MASK)); - - // If there are no references to self in other threads, - // expensive atomic operations can be avoided. - if mut_self { - let slf_arc: &AtomicPtr = mem::transmute(&self.arc); - slf_arc.store(shared, Relaxed); - return Inner { - arc: NonNull::new_unchecked(shared), - ..*self - }; - } - - // Try compare & swapping the pointer into the `arc` field. - // `Release` is used synchronize with other threads that - // will load the `arc` field. - // - // If the `compare_exchange` fails, then the thread lost the - // race to promote the buffer to shared. The `Acquire` - // ordering will synchronize with the `compare_and_swap` - // that happened in the other thread and the `Shared` - // pointed to by `actual` will be visible. - let slf_arc: &AtomicPtr = mem::transmute(&self.arc); - match slf_arc.compare_exchange(arc as *mut Shared, shared, AcqRel, Acquire) { - Ok(actual) => { - debug_assert!(actual as usize == arc); - // The upgrade was successful, the new handle can be - // returned. - Inner { - arc: NonNull::new_unchecked(shared), - ..*self - } + let vec_arc = (arc as usize & KIND_UNMASK) as *mut SharedVec; + let old_size = (*vec_arc).ref_count.fetch_add(1, Relaxed); + if old_size == usize::MAX { + abort(); } - Err(actual) => { - // The upgrade failed, a concurrent clone happened. Release - // the allocation that was made in this thread, it will not - // be needed. - let shared = Box::from_raw(shared); - mem::forget(*shared); - // Buffer already promoted to shared storage, so increment ref - // count. - self.shallow_clone_arc(actual) + Inner { + arc: NonNull::new_unchecked(arc as *mut Shared), + ..*self } } } @@ -2433,134 +2357,64 @@ impl Inner { let new_cap = len + additional; // Promote to a vector - let mut v = Vec::with_capacity(new_cap); - v.extend_from_slice(self.as_ref()); - - self.ptr = v.as_mut_ptr(); - self.len = v.len(); - self.cap = v.capacity(); - - // Since the minimum capacity is `INLINE_CAP`, don't bother encoding - // the original capacity as INLINE_CAP - self.arc = unsafe { NonNull::new_unchecked(KIND_VEC as *mut Shared) }; - - mem::forget(v); + *self = + Inner::from_slice(new_cap, self.as_ref(), PoolId::DEFAULT.pool_ref()); return; } - if kind == KIND_VEC { - // If there's enough free space before the start of the buffer, then - // just copy the data backwards and reuse the already-allocated - // space. - // - // Otherwise, since backed by a vector, use `Vec::reserve` - unsafe { - let (off, prev) = self.uncoordinated_get_vec_pos(); - - // Only reuse space if we stand to gain at least capacity/2 - // bytes of space back - if off >= additional && off >= (self.cap / 2) { - // There's space - reuse it - // - // Just move the pointer back to the start after copying - // data back. - let base_ptr = self.ptr.offset(-(off as isize)); - ptr::copy(self.ptr, base_ptr, self.len); - self.ptr = base_ptr; - self.uncoordinated_set_vec_pos(0, prev); - - // Length stays constant, but since we moved backwards we - // can gain capacity back. - self.cap += off; - } else { - // No space - allocate more - let mut v = rebuild_vec(self.ptr, self.len, self.cap, off); - v.reserve(additional); - - // Update the info - self.ptr = v.as_mut_ptr().add(off); - self.len = v.len() - off; - self.cap = v.capacity() - off; - - // Drop the vec reference - mem::forget(v); - } - return; - } - } - - let arc = self.arc.as_ptr(); - - debug_assert!(kind == KIND_ARC); - // Reserving involves abandoning the currently shared buffer and // allocating a new vector with the requested capacity. - // - // Compute the new capacity - let mut new_cap = len + additional; - let original_capacity; - let original_capacity_repr; + let new_cap = len + additional; - unsafe { - original_capacity_repr = (*arc).original_capacity_repr; - original_capacity = original_capacity_from_repr(original_capacity_repr); + if kind == KIND_VEC { + let vec = self.shared_vec(); - // First, try to reclaim the buffer. This is possible if the current - // handle is the only outstanding handle pointing to the buffer. - if (*arc).is_unique() { - // This is the only handle to the buffer. It can be reclaimed. - // However, before doing the work of copying data, check to make - // sure that the vector has enough capacity. - let v = &mut (*arc).vec; + unsafe { + let vec_cap = (*vec).cap - SHARED_VEC_SIZE; - if v.capacity() >= new_cap { + // First, try to reclaim the buffer. This is possible if the current + // handle is the only outstanding handle pointing to the buffer. + if (*vec).is_unique() && vec_cap >= new_cap { // The capacity is sufficient, reclaim the buffer - let ptr = v.as_mut_ptr(); - + let ptr = (vec as *mut u8).add(SHARED_VEC_SIZE); ptr::copy(self.ptr, ptr, len); self.ptr = ptr; - self.cap = v.capacity(); + self.cap = vec_cap; + } else { + // Create a new vector storage + *self = Inner::from_slice(new_cap, self.as_ref(), (*vec).pool); + } + } + } else { + debug_assert!(kind == KIND_ARC); - return; + let arc = self.arc.as_ptr(); + unsafe { + // First, try to reclaim the buffer. This is possible if the current + // handle is the only outstanding handle pointing to the buffer. + if (*arc).is_unique() { + // This is the only handle to the buffer. It can be reclaimed. + // However, before doing the work of copying data, check to make + // sure that the vector has enough capacity. + let v = &mut (*arc).vec; + + if v.capacity() >= new_cap { + // The capacity is sufficient, reclaim the buffer + let ptr = v.as_mut_ptr(); + + ptr::copy(self.ptr, ptr, len); + + self.ptr = ptr; + self.cap = v.capacity(); + return; + } } - // The vector capacity is not sufficient. The reserve request is - // asking for more than the initial buffer capacity. Allocate more - // than requested if `new_cap` is not much bigger than the current - // capacity. - // - // There are some situations, using `reserve_exact` that the - // buffer capacity could be below `original_capacity`, so do a - // check. - new_cap = - cmp::max(cmp::max(v.capacity() << 1, new_cap), original_capacity); - } else { - new_cap = cmp::max(new_cap, original_capacity); + // Create a new vector storage + *self = Inner::from_slice(new_cap, self.as_ref(), (*arc).pool); } } - - // Create a new vector to store the data - let mut v = Vec::with_capacity(new_cap); - - // Copy the bytes - v.extend_from_slice(self.as_ref()); - - // Release the shared handle. This must be done *after* the bytes are - // copied. - release_shared(arc); - - // Update self - self.ptr = v.as_mut_ptr(); - self.len = v.len(); - self.cap = v.capacity(); - - let arc = (original_capacity_repr << ORIGINAL_CAPACITY_OFFSET) | KIND_VEC; - - self.arc = unsafe { NonNull::new_unchecked(arc as *mut Shared) }; - - // Forget the vector handle - mem::forget(v); } /// Returns true if the buffer is stored inline @@ -2581,19 +2435,17 @@ impl Inner { kind == KIND_INLINE || kind == KIND_STATIC } - /// Used for `debug_assert` statements. &mut is used to guarantee that it is - /// safe to check VEC_KIND - #[inline] - fn is_shared(&mut self) -> bool { - !matches!(self.kind(), KIND_VEC) - } - /// Used for `debug_assert` statements #[inline] fn is_static(&mut self) -> bool { matches!(self.kind(), KIND_STATIC) } + #[inline] + fn shared_vec(&self) -> *mut SharedVec { + ((self.arc.as_ptr() as usize) & KIND_UNMASK) as *mut SharedVec + } + #[inline] fn kind(&self) -> usize { // This function is going to probably raise some eyebrows. The function @@ -2635,39 +2487,6 @@ impl Inner { imp(self.arc.as_ptr()) } - - #[inline] - fn uncoordinated_get_vec_pos(&mut self) -> (usize, usize) { - // Similar to above, this is a pretty crazed function. This should only - // be called when in the KIND_VEC mode. This + the &mut self argument - // guarantees that there is no possibility of concurrent calls to this - // function. - let prev = self.arc.as_ptr() as usize; - - (prev >> VEC_POS_OFFSET, prev) - } - - #[inline] - fn uncoordinated_set_vec_pos(&mut self, pos: usize, prev: usize) { - // Once more... crazy - debug_assert!(pos <= MAX_VEC_POS); - - unsafe { - self.arc = NonNull::new_unchecked( - ((pos << VEC_POS_OFFSET) | (prev & NOT_VEC_POS_MASK)) as *mut Shared, - ); - } - } -} - -fn rebuild_vec(ptr: *mut u8, mut len: usize, mut cap: usize, off: usize) -> Vec { - unsafe { - let ptr = ptr.offset(-(off as isize)); - len += off; - cap += off; - - Vec::from_raw_parts(ptr, len, cap) - } } impl Drop for Inner { @@ -2675,10 +2494,7 @@ impl Drop for Inner { let kind = self.kind(); if kind == KIND_VEC { - let (off, _) = self.uncoordinated_get_vec_pos(); - - // Vector storage, free the vector - let _ = rebuild_vec(self.ptr, self.len, self.cap, off); + release_shared_vec(self.shared_vec()); } else if kind == KIND_ARC { release_shared(self.arc.as_ptr()); } @@ -2712,7 +2528,42 @@ fn release_shared(ptr: *mut Shared) { atomic::fence(Acquire); // Drop the data - Box::from_raw(ptr); + let arc = Box::from_raw(ptr); + arc.pool.release(arc.vec.capacity()); + } +} + +fn release_shared_vec(ptr: *mut SharedVec) { + // `Shared` storage... follow the drop steps from Arc. + unsafe { + if (*ptr).ref_count.fetch_sub(1, Release) != 1 { + return; + } + + // This fence is needed to prevent reordering of use of the data and + // deletion of the data. Because it is marked `Release`, the decreasing + // of the reference count synchronizes with this `Acquire` fence. This + // means that use of the data happens before decreasing the reference + // count, which happens before this fence, which happens before the + // deletion of the data. + // + // As explained in the [Boost documentation][1], + // + // > It is important to enforce any possible access to the object in one + // > thread (through an existing reference) to *happen before* deleting + // > the object in a different thread. This is achieved by a "release" + // > operation after dropping a reference (any access to the object + // > through this reference must obviously happened before), and an + // > "acquire" operation before deleting the object. + // + // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) + atomic::fence(Acquire); + + // Drop the data + let cap = (*ptr).cap; + (*ptr).pool.release(cap); + ptr::drop_in_place(ptr); + Vec::from_raw_parts(ptr, 0, cap); } } @@ -2732,70 +2583,11 @@ impl Shared { } } -fn original_capacity_to_repr(cap: usize) -> usize { - let width = - PTR_WIDTH - ((cap >> MIN_ORIGINAL_CAPACITY_WIDTH).leading_zeros() as usize); - cmp::min( - width, - MAX_ORIGINAL_CAPACITY_WIDTH - MIN_ORIGINAL_CAPACITY_WIDTH, - ) -} - -fn original_capacity_from_repr(repr: usize) -> usize { - if repr == 0 { - return 0; +impl SharedVec { + fn is_unique(&self) -> bool { + // This is same as Shared::is_unique() but for KIND_VEC + self.ref_count.load(Acquire) == 1 } - - 1 << (repr + (MIN_ORIGINAL_CAPACITY_WIDTH - 1)) -} - -#[test] -fn test_original_capacity_to_repr() { - assert_eq!(original_capacity_to_repr(0), 0); - - let max_width = 32; - - for width in 1..(max_width + 1) { - let cap = 1 << width - 1; - - let expected = if width < MIN_ORIGINAL_CAPACITY_WIDTH { - 0 - } else if width < MAX_ORIGINAL_CAPACITY_WIDTH { - width - MIN_ORIGINAL_CAPACITY_WIDTH - } else { - MAX_ORIGINAL_CAPACITY_WIDTH - MIN_ORIGINAL_CAPACITY_WIDTH - }; - - assert_eq!(original_capacity_to_repr(cap), expected); - - if width > 1 { - assert_eq!(original_capacity_to_repr(cap + 1), expected); - } - - // MIN_ORIGINAL_CAPACITY_WIDTH must be bigger than 7 to pass tests below - if width == MIN_ORIGINAL_CAPACITY_WIDTH + 1 { - assert_eq!(original_capacity_to_repr(cap - 24), expected - 1); - assert_eq!(original_capacity_to_repr(cap + 76), expected); - } else if width == MIN_ORIGINAL_CAPACITY_WIDTH + 2 { - assert_eq!(original_capacity_to_repr(cap - 1), expected - 1); - assert_eq!(original_capacity_to_repr(cap - 48), expected - 1); - } - } -} - -#[test] -fn test_original_capacity_from_repr() { - assert_eq!(0, original_capacity_from_repr(0)); - - let min_cap = 1 << MIN_ORIGINAL_CAPACITY_WIDTH; - - assert_eq!(min_cap, original_capacity_from_repr(1)); - assert_eq!(min_cap * 2, original_capacity_from_repr(2)); - assert_eq!(min_cap * 4, original_capacity_from_repr(3)); - assert_eq!(min_cap * 8, original_capacity_from_repr(4)); - assert_eq!(min_cap * 16, original_capacity_from_repr(5)); - assert_eq!(min_cap * 32, original_capacity_from_repr(6)); - assert_eq!(min_cap * 64, original_capacity_from_repr(7)); } unsafe impl Send for Inner {} diff --git a/ntex-bytes/src/lib.rs b/ntex-bytes/src/lib.rs index d5324286..877b4700 100644 --- a/ntex-bytes/src/lib.rs +++ b/ntex-bytes/src/lib.rs @@ -52,8 +52,8 @@ #![deny( warnings, - missing_docs, - missing_debug_implementations, +// missing_docs, +// missing_debug_implementations, rust_2018_idioms )] #![doc(html_root_url = "https://docs.rs/ntex-bytes/")] @@ -64,9 +64,12 @@ pub use crate::buf::{Buf, BufMut}; mod bytes; mod debug; mod hex; +mod pool; +mod serde; mod string; pub use crate::bytes::{Bytes, BytesMut}; pub use crate::string::ByteString; -mod serde; +#[doc(hidden)] +pub use crate::pool::{Pool, PoolId, PoolRef}; diff --git a/ntex-bytes/src/pool.rs b/ntex-bytes/src/pool.rs new file mode 100644 index 00000000..ad563a76 --- /dev/null +++ b/ntex-bytes/src/pool.rs @@ -0,0 +1,777 @@ +#![allow(clippy::type_complexity)] +use std::sync::atomic::Ordering::{Relaxed, Release}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::task::{Context, Poll, Waker}; +use std::{cell::Cell, cell::RefCell, fmt, future::Future, mem, pin::Pin, rc::Rc}; + +use futures_core::task::__internal::AtomicWaker; + +use crate::BytesMut; + +pub struct Pool { + idx: Cell, + inner: &'static MemoryPool, +} + +#[derive(Copy, Clone)] +pub struct PoolRef(&'static MemoryPool); + +#[derive(Copy, Clone, Debug)] +pub struct PoolId(u8); + +pub trait AsPoolRef { + fn pool_ref(&self) -> PoolRef; +} + +#[derive(Copy, Clone)] +pub struct BufParams { + pub high: u16, + pub low: u16, +} + +bitflags::bitflags! { + struct Flags: u8 { + const SPAWNED = 0b0000_0001; + const INCREASED = 0b0000_0010; + } +} + +struct MemoryPool { + id: PoolId, + waker: AtomicWaker, + waker_alive: AtomicBool, + waiters: RefCell, + flags: Cell, + + size: AtomicUsize, + max_size: Cell, + + window_h: Cell, + window_l: Cell, + window_idx: Cell, + window_waiters: Cell, + windows: Cell<[(usize, usize); 10]>, + + // io read/write bytesmut cache and params + read_wm: Cell, + read_cache: RefCell>, + write_wm: Cell, + write_cache: RefCell>, + + spawn: RefCell>>)>>>, +} + +const CACHE_SIZE: usize = 16; + +impl PoolId { + pub const P0: PoolId = PoolId(0); + pub const P1: PoolId = PoolId(1); + pub const P2: PoolId = PoolId(2); + pub const P3: PoolId = PoolId(3); + pub const P4: PoolId = PoolId(4); + pub const P5: PoolId = PoolId(5); + pub const P6: PoolId = PoolId(6); + pub const P7: PoolId = PoolId(7); + pub const P8: PoolId = PoolId(8); + pub const P9: PoolId = PoolId(9); + pub const P10: PoolId = PoolId(10); + pub const P11: PoolId = PoolId(11); + pub const P12: PoolId = PoolId(12); + pub const P13: PoolId = PoolId(13); + pub const P14: PoolId = PoolId(14); + pub const DEFAULT: PoolId = PoolId(15); + + #[inline] + pub fn pool(self) -> Pool { + POOLS.with(|pools| Pool { + idx: Cell::new(0), + inner: pools[self.0 as usize], + }) + } + + #[inline] + pub fn pool_ref(self) -> PoolRef { + POOLS.with(|pools| PoolRef(pools[self.0 as usize])) + } + + /// Set future spawn fn + pub fn set_spawn_fn(f: T) + where + T: Fn(Pin>>) + 'static, + { + let spawn: Rc>>)> = + Rc::new(move |fut| f(fut)); + + POOLS.with(move |pools| { + for pool in pools.iter().take(15) { + *pool.spawn.borrow_mut() = Some(spawn.clone()); + } + }); + } +} + +impl AsPoolRef for PoolId { + #[inline] + fn pool_ref(&self) -> PoolRef { + POOLS.with(|pools| PoolRef(pools[self.0 as usize])) + } +} + +thread_local! { + static POOLS: [&'static MemoryPool; 16] = [ + MemoryPool::create(PoolId::P0), + MemoryPool::create(PoolId::P1), + MemoryPool::create(PoolId::P2), + MemoryPool::create(PoolId::P3), + MemoryPool::create(PoolId::P4), + MemoryPool::create(PoolId::P5), + MemoryPool::create(PoolId::P6), + MemoryPool::create(PoolId::P7), + MemoryPool::create(PoolId::P8), + MemoryPool::create(PoolId::P9), + MemoryPool::create(PoolId::P10), + MemoryPool::create(PoolId::P11), + MemoryPool::create(PoolId::P12), + MemoryPool::create(PoolId::P13), + MemoryPool::create(PoolId::P14), + MemoryPool::create(PoolId::DEFAULT), + ]; +} + +impl PoolRef { + #[inline] + /// Get pool id. + pub fn id(self) -> PoolId { + self.0.id + } + + #[inline] + /// Get `Pool` instance for this pool ref. + pub fn pool(self) -> Pool { + Pool { + idx: Cell::new(0), + inner: self.0, + } + } + + #[inline] + /// Get total number of allocated bytes. + pub fn allocated(self) -> usize { + self.0.size.load(Relaxed) + } + + #[inline] + pub fn move_in(self, buf: &mut BytesMut) { + buf.move_to_pool(self); + } + + #[inline] + /// Creates a new `BytesMut` with the specified capacity. + pub fn buf_with_capacity(self, cap: usize) -> BytesMut { + BytesMut::with_capacity_in_priv(cap, self) + } + + #[inline] + /// Set max pool size + pub fn set_pool_size(self, size: usize) -> Self { + self.0.max_size.set(size); + self.0.window_waiters.set(0); + self.0.window_l.set(size); + self.0.window_h.set(usize::MAX); + self.0.window_idx.set(0); + + let mut flags = self.0.flags.get(); + flags.insert(Flags::INCREASED); + self.0.flags.set(flags); + + // calc windows + let mut l = size; + let mut h = usize::MAX; + let mut windows: [(usize, usize); 10] = Default::default(); + windows[0] = (l, h); + + for (idx, item) in windows.iter_mut().enumerate().skip(1) { + h = l; + l = size - (size / 100) * idx; + *item = (l, h); + } + self.0.windows.set(windows); + + // release old waiters + let mut waiters = self.0.waiters.borrow_mut(); + while let Some(waker) = waiters.consume() { + waker.wake(); + } + + self + } + + #[doc(hidden)] + #[inline] + pub fn read_params(self) -> BufParams { + self.0.read_wm.get() + } + + #[doc(hidden)] + #[inline] + pub fn read_params_high(self) -> usize { + self.0.read_wm.get().high as usize + } + + #[doc(hidden)] + #[inline] + pub fn set_read_params(self, h: u16, l: u16) -> Self { + assert!(l < h); + self.0.read_wm.set(BufParams { high: h, low: l }); + self + } + + #[doc(hidden)] + #[inline] + pub fn write_params(self) -> BufParams { + self.0.write_wm.get() + } + + #[doc(hidden)] + #[inline] + pub fn write_params_high(self) -> usize { + self.0.write_wm.get().high as usize + } + + #[doc(hidden)] + #[inline] + pub fn set_write_params(self, h: u16, l: u16) -> Self { + assert!(l < h); + self.0.write_wm.set(BufParams { high: h, low: l }); + self + } + + #[doc(hidden)] + #[inline] + pub fn get_read_buf(self) -> BytesMut { + if let Some(buf) = self.0.read_cache.borrow_mut().pop() { + buf + } else { + BytesMut::with_capacity_in_priv(self.0.read_wm.get().high as usize, self) + } + } + + #[doc(hidden)] + #[inline] + /// Release read buffer, buf must be allocated from this pool + pub fn release_read_buf(self, mut buf: BytesMut) { + let cap = buf.capacity(); + let (hw, lw) = self.0.read_wm.get().unpack(); + if cap > lw && cap <= hw { + let v = &mut self.0.read_cache.borrow_mut(); + if v.len() < CACHE_SIZE { + buf.clear(); + v.push(buf); + } + } + } + + #[doc(hidden)] + #[inline] + pub fn get_write_buf(self) -> BytesMut { + if let Some(buf) = self.0.write_cache.borrow_mut().pop() { + buf + } else { + BytesMut::with_capacity_in_priv(self.0.write_wm.get().high as usize, self) + } + } + + #[doc(hidden)] + #[inline] + /// Release write buffer, buf must be allocated from this pool + pub fn release_write_buf(self, mut buf: BytesMut) { + let cap = buf.capacity(); + let (hw, lw) = self.0.write_wm.get().unpack(); + if cap > lw && cap <= hw { + let v = &mut self.0.write_cache.borrow_mut(); + if v.len() < CACHE_SIZE { + buf.clear(); + v.push(buf); + } + } + } + + #[inline] + pub(crate) fn acquire(self, size: usize) { + let prev = self.0.size.fetch_add(size, Relaxed); + if self.0.waker_alive.load(Relaxed) { + self.wake_driver(prev + size) + } + } + + #[inline] + pub(crate) fn release(self, size: usize) { + let prev = self.0.size.fetch_sub(size, Relaxed); + if self.0.waker_alive.load(Relaxed) { + self.wake_driver(prev - size) + } + } + + fn wake_driver(self, allocated: usize) { + let l = self.0.window_l.get(); + let h = self.0.window_h.get(); + if allocated < l || allocated > h { + self.0.waker_alive.store(false, Relaxed); + self.0.waker.wake(); + } + } +} + +impl Default for PoolRef { + #[inline] + fn default() -> PoolRef { + PoolId::DEFAULT.pool_ref() + } +} + +impl AsPoolRef for PoolRef { + #[inline] + fn pool_ref(&self) -> PoolRef { + *self + } +} + +impl fmt::Debug for PoolRef { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PoolRef") + .field("id", &self.id().0) + .field("allocated", &self.allocated()) + .finish() + } +} + +impl MemoryPool { + fn create(id: PoolId) -> &'static MemoryPool { + Box::leak(Box::new(MemoryPool { + id, + waker: AtomicWaker::new(), + waker_alive: AtomicBool::new(false), + waiters: RefCell::new(Waiters::new()), + flags: Cell::new(Flags::empty()), + + size: AtomicUsize::new(0), + max_size: Cell::new(0), + + window_h: Cell::new(0), + window_l: Cell::new(0), + window_waiters: Cell::new(0), + window_idx: Cell::new(0), + windows: Default::default(), + + read_wm: Cell::new(BufParams { + high: 4 * 1024, + low: 1024, + }), + read_cache: RefCell::new(Vec::with_capacity(CACHE_SIZE)), + write_wm: Cell::new(BufParams { + high: 4 * 1024, + low: 1024, + }), + write_cache: RefCell::new(Vec::with_capacity(CACHE_SIZE)), + spawn: RefCell::new(None), + })) + } +} + +impl BufParams { + #[inline] + pub fn unpack(self) -> (usize, usize) { + (self.high as usize, self.low as usize) + } +} + +impl Clone for Pool { + fn clone(&self) -> Pool { + Pool { + idx: Cell::new(0), + inner: self.inner, + } + } +} + +impl Drop for Pool { + fn drop(&mut self) { + let idx = self.idx.get(); + if idx > 0 { + // cleanup waiter + let mut waiters = self.inner.waiters.borrow_mut(); + waiters.remove(idx - 1); + waiters.truncate(); + } + } +} + +impl Pool { + #[inline] + /// Get pool id. + pub fn id(&self) -> PoolId { + self.inner.id + } + + #[inline] + /// Check if pool is pedning + pub fn is_pending(&self) -> bool { + let idx = self.idx.get(); + if idx > 0 { + if let Some(Entry::Occupied(_)) = + self.inner.waiters.borrow().entries.get(idx - 1) + { + return true; + } + } + false + } + + #[doc(hidden)] + #[inline] + /// Check if pool is pedning + pub fn windows(&self) -> [(usize, usize); 10] { + self.inner.windows.get() + } + + #[inline] + /// Get `PoolRef` instance for this pool. + pub fn pool_ref(&self) -> PoolRef { + PoolRef(self.inner) + } + + #[inline] + pub fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> { + if self.inner.max_size.get() > 0 { + let allocated = self.inner.size.load(Relaxed); + + // lower than low + let window_l = self.inner.window_l.get(); + if window_l == 0 || allocated < window_l { + let idx = self.idx.get(); + if idx > 0 { + // cleanup waiter + let mut waiters = self.inner.waiters.borrow_mut(); + waiters.remove(idx - 1); + waiters.truncate(); + self.idx.set(0); + } + return Poll::Ready(()); + } + + // register waiter + if let Some(spawn) = &*self.inner.spawn.borrow() { + let idx = self.idx.get(); + let mut flags = self.inner.flags.get(); + let mut waiters = self.inner.waiters.borrow_mut(); + let new = if idx == 0 { + self.idx.set(waiters.append(ctx.waker().clone()) + 1); + true + } else { + waiters.update(idx - 1, ctx.waker().clone()) + }; + + if flags.contains(Flags::INCREASED) || !new { + self.inner + .window_waiters + .set(self.inner.window_waiters.get() + 1); + } else if let Some(waker) = waiters.consume() { + waker.wake(); + } + + // start driver task + if !flags.contains(Flags::SPAWNED) { + flags.insert(Flags::SPAWNED); + self.inner.flags.set(flags); + spawn(Box::pin(Driver { pool: self.inner })) + } + return Poll::Pending; + } + } + Poll::Ready(()) + } +} + +struct Driver { + pool: &'static MemoryPool, +} + +impl Driver { + fn release(&self, waiters_num: usize) { + let mut waiters = self.pool.waiters.borrow_mut(); + + let mut to_release = waiters.occupied_len / 100 * 5; + if waiters_num > to_release { + to_release += waiters_num >> 1; + } else { + to_release += waiters_num; + } + + while to_release > 0 { + if let Some(waker) = waiters.consume() { + waker.wake(); + to_release -= 1; + } else { + break; + } + } + } + + fn release_all(&self) { + let mut waiters = self.pool.waiters.borrow_mut(); + while let Some(waker) = waiters.consume() { + waker.wake(); + } + } +} + +impl Future for Driver { + type Output = (); + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let pool = self.as_ref().pool; + let allocated = pool.size.load(Relaxed); + + let win_l = pool.window_l.get(); + let win_h = pool.window_h.get(); + + // allocated size is decreased, release waiters + if allocated < win_l { + let mut idx = pool.window_idx.get() + 1; + let mut waiters = pool.window_waiters.get(); + let windows = pool.windows.get(); + + loop { + // allocated size decreased more than 10%, release all + if idx == 10 { + self.release_all(); + + pool.window_l.set(windows[0].0); + pool.window_h.set(windows[0].1); + pool.window_idx.set(0); + pool.window_waiters.set(0); + pool.flags.set(Flags::INCREASED); + return Poll::Ready(()); + } else { + // release 5% of pending waiters + self.release(waiters); + + if allocated > windows[idx].0 { + pool.window_l.set(windows[idx].0); + pool.window_h.set(windows[idx].1); + pool.window_idx.set(idx); + pool.window_waiters.set(0); + pool.flags.set(Flags::SPAWNED); + break; + } + idx += 1; + waiters = 0; + } + } + } + // allocated size is increased + else if allocated > win_h { + // reset window + let idx = pool.window_idx.get() - 1; + let windows = pool.windows.get(); + pool.window_l.set(windows[idx].0); + pool.window_h.set(windows[idx].1); + pool.window_idx.set(idx); + pool.window_waiters.set(0); + pool.flags.set(Flags::SPAWNED | Flags::INCREASED); + } + + // register waker + pool.waker.register(cx.waker()); + pool.waker_alive.store(true, Release); + + Poll::Pending + } +} + +struct Waiters { + entries: Vec, + root: usize, + tail: usize, + free: usize, + len: usize, + occupied_len: usize, +} + +#[derive(Debug)] +enum Entry { + Vacant(usize), + Consumed, + Occupied(Node), +} + +#[derive(Debug)] +struct Node { + item: Waker, + prev: usize, + next: usize, +} + +impl Waiters { + fn new() -> Waiters { + Waiters { + entries: Vec::new(), + root: usize::MAX, + tail: usize::MAX, + free: 0, + len: 0, + occupied_len: 0, + } + } + + fn truncate(&mut self) { + if self.len == 0 { + self.entries.truncate(0); + self.root = usize::MAX; + self.tail = usize::MAX; + self.free = 0; + } + } + + fn get_node(&mut self, key: usize) -> &mut Node { + if let Some(Entry::Occupied(ref mut node)) = self.entries.get_mut(key) { + return node; + } + unreachable!() + } + + // consume root item + fn consume(&mut self) -> Option { + if self.root != usize::MAX { + self.occupied_len -= 1; + + let entry = self.entries.get_mut(self.root).unwrap(); + let prev = mem::replace(entry, Entry::Consumed); + + match prev { + Entry::Occupied(node) => { + debug_assert!(node.prev == usize::MAX); + + // last item + if self.tail == self.root { + self.tail = usize::MAX; + self.root = usize::MAX; + } else { + // remove from root + self.root = node.next; + self.get_node(self.root).prev = usize::MAX; + } + Some(node.item) + } + _ => { + unreachable!() + } + } + } else { + None + } + } + + fn update(&mut self, key: usize, val: Waker) -> bool { + if let Some(entry) = self.entries.get_mut(key) { + match entry { + Entry::Occupied(ref mut node) => { + node.item = val; + return false; + } + Entry::Consumed => { + *entry = Entry::Occupied(Node { + item: val, + prev: self.tail, + next: usize::MAX, + }); + } + _ => unreachable!(), + } + } + self.occupied_len += 1; + if self.root == usize::MAX { + self.root = key; + } + if self.tail != usize::MAX { + self.get_node(self.tail).next = key; + } + self.tail = key; + true + } + + fn remove(&mut self, key: usize) { + if let Some(entry) = self.entries.get_mut(key) { + // Swap the entry at the provided value + let prev = mem::replace(entry, Entry::Vacant(self.free)); + + match prev { + Entry::Occupied(node) => { + self.len -= 1; + self.occupied_len -= 1; + self.free = key; + // remove from root + if self.root == key { + self.root = node.next; + if self.root != usize::MAX { + self.get_node(self.root).prev = usize::MAX; + } + } + // remove from tail + if self.tail == key { + self.tail = node.prev; + if self.tail != usize::MAX { + self.get_node(self.tail).next = usize::MAX; + } + } + } + Entry::Consumed => { + self.len -= 1; + self.free = key; + } + _ => { + unreachable!() + } + } + } + } + + fn append(&mut self, val: Waker) -> usize { + self.len += 1; + self.occupied_len += 1; + let key = self.free; + + if key == self.entries.len() { + if self.root == usize::MAX { + self.root = key; + } + if self.tail != usize::MAX { + self.get_node(self.tail).next = key; + } + + self.entries.push(Entry::Occupied(Node { + item: val, + prev: self.tail, + next: usize::MAX, + })); + self.tail = key; + self.free = key + 1; + } else { + self.free = match self.entries.get(key) { + Some(&Entry::Vacant(next)) => next, + _ => unreachable!(), + }; + if self.tail != usize::MAX { + self.get_node(self.tail).next = key; + } + self.entries[key] = Entry::Occupied(Node { + item: val, + prev: self.tail, + next: usize::MAX, + }); + self.tail = key; + } + key + } +} diff --git a/ntex-bytes/src/string.rs b/ntex-bytes/src/string.rs index 7bf515d1..71da803d 100644 --- a/ntex-bytes/src/string.rs +++ b/ntex-bytes/src/string.rs @@ -1,7 +1,7 @@ //! A UTF-8 encoded read-only string using Bytes as storage. use std::{borrow, convert::TryFrom, fmt, hash, ops, slice, str}; -use crate::Bytes; +use crate::{Bytes, BytesMut}; /// An immutable UTF-8 encoded string with [`Bytes`] as a storage. #[derive(Clone, Default, Eq, PartialOrd, Ord)] @@ -242,7 +242,7 @@ impl TryFrom for ByteString { } } -impl TryFrom for ByteString { +impl TryFrom for ByteString { type Error = str::Utf8Error; #[inline] diff --git a/ntex-bytes/tests/test_bytes.rs b/ntex-bytes/tests/test_bytes.rs index d05e8954..a1c2e019 100644 --- a/ntex-bytes/tests/test_bytes.rs +++ b/ntex-bytes/tests/test_bytes.rs @@ -1,6 +1,7 @@ -#![deny(warnings, rust_2018_idioms)] +//#![deny(warnings, rust_2018_idioms)] +use std::task::Poll; -use ntex_bytes::{Buf, BufMut, Bytes, BytesMut}; +use ntex_bytes::{Buf, BufMut, Bytes, BytesMut, PoolId}; const LONG: &'static [u8] = b"mary had a little lamb, little lamb, little lamb"; const SHORT: &'static [u8] = b"hello world"; @@ -10,6 +11,11 @@ fn inline_cap() -> usize { 4 * mem::size_of::() - 1 } +const fn shared_vec() -> usize { + use std::mem; + 3 * mem::size_of::() +} + fn is_sync() {} fn is_send() {} @@ -108,6 +114,37 @@ fn len() { assert!(a.is_empty()); } +#[test] +fn inline() { + let a = Bytes::from("abcdefg".to_string()); + assert!(a.is_inline()); + + let a = BytesMut::from(&b"abcdefg"[..]).freeze(); + assert!(a.is_inline()); + + let a = Bytes::from("".to_string()); + assert!(a.is_empty()); + + let a = BytesMut::from(&b""[..]).freeze(); + assert!(a.is_inline()); + + let mut a = BytesMut::from(vec![b'*'; 35]).freeze(); + assert!(!a.is_inline()); + + let b = a.split_to(8); + assert!(b.is_inline()); + assert!(a.is_inline()); + + let mut a = BytesMut::from(vec![b'*'; 35]).freeze(); + let b = a.split_off(8); + assert!(b.is_inline()); + assert!(a.is_inline()); + + let mut a = BytesMut::from(vec![b'*'; 35]).freeze(); + a.truncate(8); + assert!(a.is_inline()); +} + #[test] fn index() { let a = Bytes::from(&b"hello world"[..]); @@ -194,7 +231,7 @@ fn split_off_to_loop() { let mut bytes = Bytes::from(&s[..]); let off = bytes.split_off(i); assert_eq!(i, bytes.len()); - let mut sum = Vec::new(); + let mut sum: Vec = Vec::new(); sum.extend(bytes.iter()); sum.extend(off.iter()); assert_eq!(&s[..], &sum[..]); @@ -203,7 +240,7 @@ fn split_off_to_loop() { let mut bytes = BytesMut::from(&s[..]); let off = bytes.split_off(i); assert_eq!(i, bytes.len()); - let mut sum = Vec::new(); + let mut sum: Vec = Vec::new(); sum.extend(&bytes); sum.extend(&off); assert_eq!(&s[..], &sum[..]); @@ -212,7 +249,7 @@ fn split_off_to_loop() { let mut bytes = Bytes::from(&s[..]); let off = bytes.split_to(i); assert_eq!(i, off.len()); - let mut sum = Vec::new(); + let mut sum: Vec = Vec::new(); sum.extend(off.iter()); sum.extend(bytes.iter()); assert_eq!(&s[..], &sum[..]); @@ -221,7 +258,7 @@ fn split_off_to_loop() { let mut bytes = BytesMut::from(&s[..]); let off = bytes.split_to(i); assert_eq!(i, off.len()); - let mut sum = Vec::new(); + let mut sum: Vec = Vec::new(); sum.extend(&off); sum.extend(&bytes); assert_eq!(&s[..], &sum[..]); @@ -232,20 +269,20 @@ fn split_off_to_loop() { #[test] fn split_to_1() { // Inline - let mut a = Bytes::from(SHORT); + let mut a = Bytes::from(&SHORT[..]); let b = a.split_to(4); assert_eq!(SHORT[4..], a); assert_eq!(SHORT[..4], b); // Allocated - let mut a = Bytes::from(LONG); + let mut a = Bytes::from(Vec::from(LONG)); let b = a.split_to(4); assert_eq!(LONG[4..], a); assert_eq!(LONG[..4], b); - let mut a = Bytes::from(LONG); + let mut a = Bytes::from(Vec::from(LONG)); let b = a.split_to(30); assert_eq!(LONG[30..], a); @@ -322,31 +359,13 @@ fn fns_defined_for_bytes_mut() { #[test] fn reserve_convert() { - // Inline -> Vec - let mut bytes = BytesMut::with_capacity(8); - bytes.put("hello".as_bytes()); - bytes.reserve(40); - assert_eq!(bytes.capacity(), 45); - assert_eq!(bytes, "hello"); - - // Inline -> Inline - let mut bytes = BytesMut::with_capacity(inline_cap()); - bytes.put("abcdefghijkl".as_bytes()); - - let a = bytes.split_to(10); - bytes.reserve(inline_cap() - 3); - assert_eq!(inline_cap(), bytes.capacity()); - - assert_eq!(bytes, "kl"); - assert_eq!(a, "abcdefghij"); - // Vec -> Vec let mut bytes = BytesMut::from(LONG); bytes.reserve(64); assert_eq!(bytes.capacity(), LONG.len() + 64); // Arc -> Vec - let mut bytes = BytesMut::from(LONG); + let mut bytes = BytesMut::from(Vec::from(LONG)); let a = bytes.split_to(30); bytes.reserve(128); @@ -355,46 +374,6 @@ fn reserve_convert() { drop(a); } -#[test] -fn reserve_growth() { - let mut bytes = BytesMut::with_capacity(64); - bytes.put("hello world".as_bytes()); - let _ = bytes.split(); - - bytes.reserve(65); - assert_eq!(bytes.capacity(), 128); -} - -#[test] -fn reserve_allocates_at_least_original_capacity() { - let mut bytes = BytesMut::with_capacity(1024); - - for i in 0..1020 { - bytes.put_u8(i as u8); - } - - let _other = bytes.split(); - - bytes.reserve(16); - assert_eq!(bytes.capacity(), 1024); -} - -#[test] -fn reserve_max_original_capacity_value() { - const SIZE: usize = 128 * 1024; - - let mut bytes = BytesMut::with_capacity(SIZE); - - for _ in 0..SIZE { - bytes.put_u8(0u8); - } - - let _other = bytes.split(); - - bytes.reserve(16); - assert_eq!(bytes.capacity(), 64 * 1024); -} - // Without either looking at the internals of the BytesMut or doing weird stuff // with the memory allocator, there's no good way to automatically verify from // within the program that this actually recycles memory. Instead, just exercise @@ -431,7 +410,7 @@ fn reserve_in_arc_unique_doubles() { assert_eq!(1000, bytes.capacity()); bytes.reserve(1001); - assert_eq!(2000, bytes.capacity()); + assert_eq!(1001, bytes.capacity()); } #[test] @@ -462,13 +441,6 @@ fn extend_mut() { assert_eq!(*bytes, LONG[..]); } -#[test] -fn extend_shr() { - let mut bytes = Bytes::new(); - bytes.extend(LONG); - assert_eq!(*bytes, LONG[..]); -} - #[test] fn extend_from_slice_mut() { for &i in &[3, 34] { @@ -479,16 +451,6 @@ fn extend_from_slice_mut() { } } -#[test] -fn extend_from_slice_shr() { - for &i in &[3, 34] { - let mut bytes = Bytes::new(); - bytes.extend_from_slice(&LONG[..i]); - bytes.extend_from_slice(&LONG[i..]); - assert_eq!(LONG[..], *bytes); - } -} - #[test] fn from_static() { let mut a = Bytes::from_static(b"ab"); @@ -585,258 +547,6 @@ fn partial_eq_bytesmut() { assert!(bytesmut != bytes2); } -#[test] -fn bytes_unsplit_basic() { - let mut buf = Bytes::with_capacity(64); - buf.extend_from_slice(b"aaabbbcccddd"); - - let splitted = buf.split_off(6); - assert_eq!(b"aaabbb", &buf[..]); - assert_eq!(b"cccddd", &splitted[..]); - - buf.unsplit(splitted); - assert_eq!(b"aaabbbcccddd", &buf[..]); -} - -#[test] -fn bytes_unsplit_empty_other() { - let mut buf = Bytes::with_capacity(64); - buf.extend_from_slice(b"aaabbbcccddd"); - - // empty other - let other = Bytes::new(); - - buf.unsplit(other); - assert_eq!(b"aaabbbcccddd", &buf[..]); -} - -#[test] -fn bytes_unsplit_empty_self() { - // empty self - let mut buf = Bytes::new(); - - let mut other = Bytes::with_capacity(64); - other.extend_from_slice(b"aaabbbcccddd"); - - buf.unsplit(other); - assert_eq!(b"aaabbbcccddd", &buf[..]); -} - -#[test] -fn bytes_unsplit_inline_arc() { - let mut buf = Bytes::with_capacity(8); //inline - buf.extend_from_slice(b"aaaabbbb"); - - let mut buf2 = Bytes::with_capacity(64); - buf2.extend_from_slice(b"ccccddddeeee"); - - buf2.split_off(8); //arc - - buf.unsplit(buf2); - assert_eq!(b"aaaabbbbccccdddd", &buf[..]); -} - -#[test] -fn bytes_unsplit_arc_inline() { - let mut buf = Bytes::with_capacity(64); - buf.extend_from_slice(b"aaaabbbbeeee"); - - buf.split_off(8); //arc - - let mut buf2 = Bytes::with_capacity(8); //inline - buf2.extend_from_slice(b"ccccdddd"); - - buf.unsplit(buf2); - assert_eq!(b"aaaabbbbccccdddd", &buf[..]); -} - -#[test] -fn bytes_unsplit_both_inline() { - let mut buf = Bytes::with_capacity(16); //inline - buf.extend_from_slice(b"aaaabbbbccccdddd"); - - let splitted = buf.split_off(8); // both inline - assert_eq!(b"aaaabbbb", &buf[..]); - assert_eq!(b"ccccdddd", &splitted[..]); - - buf.unsplit(splitted); - assert_eq!(b"aaaabbbbccccdddd", &buf[..]); -} - -#[test] -fn bytes_unsplit_arc_different() { - let mut buf = Bytes::with_capacity(64); - buf.extend_from_slice(b"aaaabbbbeeee"); - - buf.split_off(8); //arc - - let mut buf2 = Bytes::with_capacity(64); - buf2.extend_from_slice(b"ccccddddeeee"); - - buf2.split_off(8); //arc - - buf.unsplit(buf2); - assert_eq!(b"aaaabbbbccccdddd", &buf[..]); -} - -#[test] -fn bytes_unsplit_arc_non_contiguous() { - let mut buf = Bytes::with_capacity(64); - buf.extend_from_slice(b"aaaabbbbeeeeccccdddd"); - - let mut buf2 = buf.split_off(8); //arc - - let buf3 = buf2.split_off(4); //arc - - buf.unsplit(buf3); - assert_eq!(b"aaaabbbbccccdddd", &buf[..]); -} - -#[test] -fn bytes_unsplit_two_split_offs() { - let mut buf = Bytes::with_capacity(64); - buf.extend_from_slice(b"aaaabbbbccccdddd"); - - let mut buf2 = buf.split_off(8); //arc - let buf3 = buf2.split_off(4); //arc - - buf2.unsplit(buf3); - buf.unsplit(buf2); - assert_eq!(b"aaaabbbbccccdddd", &buf[..]); -} - -#[test] -fn bytes_unsplit_overlapping_references() { - let mut buf = Bytes::with_capacity(64); - buf.extend_from_slice(b"abcdefghijklmnopqrstuvwxyz"); - let mut buf0010 = buf.slice(0..10); - let buf1020 = buf.slice(10..20); - let buf0515 = buf.slice(5..15); - buf0010.unsplit(buf1020); - assert_eq!(b"abcdefghijklmnopqrst", &buf0010[..]); - assert_eq!(b"fghijklmno", &buf0515[..]); -} - -#[test] -fn bytes_mut_unsplit_basic() { - let mut buf = BytesMut::with_capacity(64); - buf.extend_from_slice(b"aaabbbcccddd"); - - let splitted = buf.split_off(6); - assert_eq!(b"aaabbb", &buf[..]); - assert_eq!(b"cccddd", &splitted[..]); - - buf.unsplit(splitted); - assert_eq!(b"aaabbbcccddd", &buf[..]); -} - -#[test] -fn bytes_mut_unsplit_empty_other() { - let mut buf = BytesMut::with_capacity(64); - buf.extend_from_slice(b"aaabbbcccddd"); - - // empty other - let other = BytesMut::new(); - - buf.unsplit(other); - assert_eq!(b"aaabbbcccddd", &buf[..]); -} - -#[test] -fn bytes_mut_unsplit_empty_self() { - // empty self - let mut buf = BytesMut::new(); - - let mut other = BytesMut::with_capacity(64); - other.extend_from_slice(b"aaabbbcccddd"); - - buf.unsplit(other); - assert_eq!(b"aaabbbcccddd", &buf[..]); -} - -#[test] -fn bytes_mut_unsplit_inline_arc() { - let mut buf = BytesMut::with_capacity(8); //inline - buf.extend_from_slice(b"aaaabbbb"); - - let mut buf2 = BytesMut::with_capacity(64); - buf2.extend_from_slice(b"ccccddddeeee"); - - buf2.split_off(8); //arc - - buf.unsplit(buf2); - assert_eq!(b"aaaabbbbccccdddd", &buf[..]); -} - -#[test] -fn bytes_mut_unsplit_arc_inline() { - let mut buf = BytesMut::with_capacity(64); - buf.extend_from_slice(b"aaaabbbbeeee"); - - buf.split_off(8); //arc - - let mut buf2 = BytesMut::with_capacity(8); //inline - buf2.extend_from_slice(b"ccccdddd"); - - buf.unsplit(buf2); - assert_eq!(b"aaaabbbbccccdddd", &buf[..]); -} - -#[test] -fn bytes_mut_unsplit_both_inline() { - let mut buf = BytesMut::with_capacity(16); //inline - buf.extend_from_slice(b"aaaabbbbccccdddd"); - - let splitted = buf.split_off(8); // both inline - assert_eq!(b"aaaabbbb", &buf[..]); - assert_eq!(b"ccccdddd", &splitted[..]); - - buf.unsplit(splitted); - assert_eq!(b"aaaabbbbccccdddd", &buf[..]); -} - -#[test] -fn bytes_mut_unsplit_arc_different() { - let mut buf = BytesMut::with_capacity(64); - buf.extend_from_slice(b"aaaabbbbeeee"); - - buf.split_off(8); //arc - - let mut buf2 = BytesMut::with_capacity(64); - buf2.extend_from_slice(b"ccccddddeeee"); - - buf2.split_off(8); //arc - - buf.unsplit(buf2); - assert_eq!(b"aaaabbbbccccdddd", &buf[..]); -} - -#[test] -fn bytes_mut_unsplit_arc_non_contiguous() { - let mut buf = BytesMut::with_capacity(64); - buf.extend_from_slice(b"aaaabbbbeeeeccccdddd"); - - let mut buf2 = buf.split_off(8); //arc - - let buf3 = buf2.split_off(4); //arc - - buf.unsplit(buf3); - assert_eq!(b"aaaabbbbccccdddd", &buf[..]); -} - -#[test] -fn bytes_mut_unsplit_two_split_offs() { - let mut buf = BytesMut::with_capacity(64); - buf.extend_from_slice(b"aaaabbbbccccdddd"); - - let mut buf2 = buf.split_off(8); //arc - let buf3 = buf2.split_off(4); //arc - - buf2.unsplit(buf3); - buf.unsplit(buf2); - assert_eq!(b"aaaabbbbccccdddd", &buf[..]); -} - #[test] fn from_iter_no_size_hint() { use std::iter; @@ -910,3 +620,82 @@ fn empty_slice_ref_catches_not_an_empty_subset() { bytes.slice_ref(slice); } + +#[test] +fn pool() { + // Pool + let p1 = PoolId::P1.pool_ref(); + assert_eq!(p1.allocated(), 0); + let mut buf = BytesMut::with_capacity_in(1024, p1); + assert_eq!(p1.allocated(), 1024 + shared_vec()); + buf.reserve(2048); + assert_eq!(p1.allocated(), 2048 + shared_vec()); + drop(buf); + assert_eq!(p1.allocated(), 0); + + // Default pool + let p = PoolId::DEFAULT.pool_ref(); + assert_eq!(p.allocated(), 0); + let mut buf = BytesMut::with_capacity(1024); + assert_eq!(p.allocated(), 1024 + shared_vec()); + buf.reserve(2048); + assert_eq!(p.allocated(), 2048 + shared_vec()); + drop(buf); + assert_eq!(p.allocated(), 0); + + let mut buf = BytesMut::with_capacity(1024); + assert_eq!(p.allocated(), 1024 + shared_vec()); + assert_eq!(p1.allocated(), 0); + p1.move_in(&mut buf); + assert_eq!(p.allocated(), 0); + assert_eq!(p1.allocated(), 1024 + shared_vec()); +} + +#[ntex::test] +async fn pool_usage() { + use ntex::{time, util}; + + PoolId::set_spawn_fn(|f| { + let _ = ntex::rt::spawn(f); + }); + + let p_ref = PoolId::P1.pool_ref().set_pool_size(10 * 1024); + let p1 = p_ref.pool(); + let p2 = p_ref.pool(); + + assert_eq!(Poll::Ready(()), util::lazy(|cx| p1.poll_ready(cx)).await); + + let buf = BytesMut::with_capacity_in(11 * 1024, p_ref); + assert_eq!(Poll::Pending, util::lazy(|cx| p1.poll_ready(cx)).await); + assert!(p1.is_pending()); + assert_eq!(Poll::Pending, util::lazy(|cx| p2.poll_ready(cx)).await); + assert!(p2.is_pending()); + time::sleep(time::Millis(50)).await; + drop(buf); + + time::sleep(time::Millis(50)).await; + assert!(!p1.is_pending()); + assert!(!p2.is_pending()); + + assert_eq!(Poll::Ready(()), util::lazy(|cx| p1.poll_ready(cx)).await); + assert_eq!(Poll::Ready(()), util::lazy(|cx| p2.poll_ready(cx)).await); + + // pool is full + let buf = BytesMut::with_capacity_in(11 * 1024, p_ref); + assert_eq!(Poll::Pending, util::lazy(|cx| p1.poll_ready(cx)).await); + assert_eq!(Poll::Pending, util::lazy(|cx| p2.poll_ready(cx)).await); + drop(buf); + + // pool has some space + let buf = BytesMut::with_capacity_in(10100, p_ref); + time::sleep(time::Millis(50)).await; + assert!(!p1.is_pending()); + assert!(p2.is_pending()); + + // new pools should wait for next update + assert_eq!(Poll::Pending, util::lazy(|cx| p1.poll_ready(cx)).await); + drop(buf); + time::sleep(time::Millis(50)).await; + assert!(!p1.is_pending()); + assert!(!p2.is_pending()); +} diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index db64e254..02cf4572 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -18,8 +18,8 @@ path = "src/lib.rs" [dependencies] bitflags = "1.2" slab = "0.4" -futures-core = { version = "0.3.13", default-features = false, features = ["alloc"] } -futures-sink = { version = "0.3.13", default-features = false, features = ["alloc"] } +futures-core = { version = "0.3.18", default-features = false, features = ["alloc"] } +futures-sink = { version = "0.3.18", default-features = false, features = ["alloc"] } pin-project-lite = "0.2.6" [dev-dependencies] diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index dd4e30bd..7d27e176 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.4.11] - 2021-12-xx + +* framed: Use memory pool + ## [0.4.10] - 2021-11-29 * Fix potential overflow sub in timer wheel @@ -10,7 +14,6 @@ * Update webpki to 0.22 * Update webpki-roots to 0.22 * Update tokio-rustls to 0.23 -* Update tokio-ssl to 0.6.3 * Adapt code for rustls breaking changes ## [0.4.8] - 2021-11-08 diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index d9e2e8b6..d0d0b6d6 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.4.10" +version = "0.4.11" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -49,14 +49,14 @@ ntex-router = "0.5.1" ntex-service = "0.2.1" ntex-macros = "0.1.3" ntex-util = "0.1.1" -ntex-bytes = "0.1.4" +ntex-bytes = "0.1.5" base64 = "0.13" bitflags = "1.3" derive_more = "0.99.14" fxhash = "0.2.1" -futures-core = { version = "0.3.16", default-features = false, features = ["alloc"] } -futures-sink = { version = "0.3.16", default-features = false, features = ["alloc"] } +futures-core = { version = "0.3.18", default-features = false, features = ["alloc"] } +futures-sink = { version = "0.3.18", default-features = false, features = ["alloc"] } log = "0.4" mio = "0.7.11" num_cpus = "1.13" @@ -86,7 +86,7 @@ coo-kie = { version = "0.15", package = "cookie", optional = true } # openssl open-ssl = { version="0.10", package = "openssl", optional = true } -tokio-openssl = { version = "0.6.3", optional = true } +tokio-openssl = { version = "0.6", optional = true } # rustls rust-tls = { version = "0.20", package = "rustls", optional = true } @@ -97,7 +97,7 @@ webpki-roots = { version = "0.22", optional = true } # compression brotli2 = { version="0.3.2", optional = true } -flate2 = { version = "1.0.20", optional = true } +flate2 = { version = "1.0.22", optional = true } [dev-dependencies] env_logger = "0.9" @@ -106,4 +106,4 @@ time = "0.2" open-ssl = { version="0.10", package = "openssl" } rust-tls = { version = "0.20", package="rustls", features = ["dangerous_configuration"] } webpki = "0.21" -futures = "0.3.16" +futures = "0.3" diff --git a/ntex/src/framed/dispatcher.rs b/ntex/src/framed/dispatcher.rs index 6afffa46..416b8ae5 100644 --- a/ntex/src/framed/dispatcher.rs +++ b/ntex/src/framed/dispatcher.rs @@ -8,7 +8,7 @@ use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; use crate::framed::{DispatchItem, Read, ReadTask, State, Timer, Write, WriteTask}; use crate::service::{IntoService, Service}; use crate::time::Seconds; -use crate::util::Either; +use crate::util::{Either, Pool}; type Response = ::Item; @@ -43,6 +43,7 @@ where ka_updated: Cell, error: Cell>, shared: Rc>, + pool: Pool, } struct DispatcherShared @@ -128,9 +129,7 @@ where service: service.into_service(), fut: None, inner: DispatcherInner { - state, - timer, - ka_timeout, + pool: state.memory_pool().pool(), ka_updated: Cell::new(updated), error: Cell::new(None), st: Cell::new(DispatcherState::Processing), @@ -139,6 +138,9 @@ where error: Cell::new(None), inflight: Cell::new(0), }), + state, + timer, + ka_timeout, }, } } @@ -222,6 +224,12 @@ where } } + // handle memory pool pressure + if slf.pool.poll_ready(cx).is_pending() { + read.pause(cx.waker()); + return Poll::Pending; + } + loop { match slf.st.get() { DispatcherState::Processing => { @@ -517,7 +525,7 @@ mod tests { use crate::codec::BytesCodec; use crate::testing::Io; use crate::time::{sleep, Millis}; - use crate::util::{Bytes, Ready}; + use crate::util::{Bytes, PoolRef, Ready}; use super::*; @@ -567,6 +575,7 @@ mod tests { state: state.clone(), error: Cell::new(None), st: Cell::new(DispatcherState::Processing), + pool: state.memory_pool().pool(), }, }, state, @@ -597,11 +606,10 @@ mod tests { }); sleep(Millis(25)).await; - client.write("GET /test HTTP/1\r\n\r\n"); - let buf = client.read().await.unwrap(); assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n")); + client.write("GET /test HTTP/1\r\n\r\n"); let buf = client.read().await.unwrap(); assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n")); @@ -774,7 +782,9 @@ mod tests { } }), ); - state.set_buffer_params(8 * 1024, 16 * 1024, 1024); + let pool = PoolRef::default(); + pool.set_read_params(8 * 1024, 1024); + pool.set_write_params(16 * 1024, 1024); crate::rt::spawn(async move { let _ = disp.await; }); diff --git a/ntex/src/framed/state.rs b/ntex/src/framed/state.rs index 643a5523..7a023937 100644 --- a/ntex/src/framed/state.rs +++ b/ntex/src/framed/state.rs @@ -7,7 +7,7 @@ use slab::Slab; use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts}; use crate::task::LocalWaker; use crate::time::Seconds; -use crate::util::{poll_fn, Buf, BytesMut, Either}; +use crate::util::{poll_fn, Buf, BytesMut, Either, PoolId, PoolRef}; bitflags::bitflags! { pub struct Flags: u16 { @@ -43,9 +43,7 @@ pub struct State(Rc); pub(crate) struct IoStateInner { flags: Cell, - lw: Cell, - read_hw: Cell, - write_hw: Cell, + pool: Cell, disconnect_timeout: Cell, error: Cell>, read_task: LocalWaker, @@ -56,29 +54,6 @@ pub(crate) struct IoStateInner { on_disconnect: RefCell>>, } -thread_local!(static R_BYTES_POOL: RefCell> = RefCell::new(Vec::with_capacity(16))); -thread_local!(static W_BYTES_POOL: RefCell> = RefCell::new(Vec::with_capacity(16))); - -fn release_to_r_pool(mut buf: BytesMut) { - R_BYTES_POOL.with(|pool| { - let v = &mut pool.borrow_mut(); - if v.len() < 16 { - buf.clear(); - v.push(buf); - } - }) -} - -fn release_to_w_pool(mut buf: BytesMut) { - W_BYTES_POOL.with(|pool| { - let v = &mut pool.borrow_mut(); - if v.len() < 16 { - buf.clear(); - v.push(buf); - } - }) -} - impl IoStateInner { fn insert_flags(&self, f: Flags) { let mut flags = self.flags.get(); @@ -96,13 +71,7 @@ impl IoStateInner { if let Some(buf) = self.read_buf.take() { buf } else { - R_BYTES_POOL.with(|pool| { - if let Some(buf) = pool.borrow_mut().pop() { - buf - } else { - BytesMut::with_capacity(self.read_hw.get() as usize) - } - }) + self.pool.get().get_read_buf() } } @@ -110,21 +79,13 @@ impl IoStateInner { if let Some(buf) = self.write_buf.take() { buf } else { - W_BYTES_POOL.with(|pool| { - if let Some(buf) = pool.borrow_mut().pop() { - buf - } else { - BytesMut::with_capacity(self.write_hw.get() as usize) - } - }) + self.pool.get().get_write_buf() } } fn release_read_buf(&self, buf: BytesMut) { if buf.is_empty() { - if buf.capacity() > (self.lw.get() as usize) { - release_to_r_pool(buf); - } + self.pool.get().release_read_buf(buf); } else { self.read_buf.set(Some(buf)); } @@ -132,10 +93,7 @@ impl IoStateInner { fn release_write_buf(&self, buf: BytesMut) { if buf.is_empty() { - let cap = buf.capacity(); - if cap > (self.lw.get() as usize) && cap <= self.write_hw.get() as usize { - release_to_w_pool(buf); - } + self.pool.get().release_write_buf(buf); } else { self.write_buf.set(Some(buf)); } @@ -145,16 +103,10 @@ impl IoStateInner { impl Drop for IoStateInner { fn drop(&mut self) { if let Some(buf) = self.read_buf.take() { - let cap = buf.capacity(); - if cap > (self.lw.get() as usize) && cap <= self.read_hw.get() as usize { - release_to_r_pool(buf); - } + self.pool.get().release_read_buf(buf); } if let Some(buf) = self.write_buf.take() { - let cap = buf.capacity(); - if cap > (self.lw.get() as usize) && cap <= self.write_hw.get() as usize { - release_to_w_pool(buf); - } + self.pool.get().release_write_buf(buf); } } } @@ -183,12 +135,16 @@ impl State { #[inline] /// Create `State` instance pub fn new() -> Self { + Self::with_memory_pool(PoolId::DEFAULT.pool_ref()) + } + + #[inline] + /// Create `State` instance with specific memory pool. + pub fn with_memory_pool(pool: PoolRef) -> Self { State(Rc::new(IoStateInner { + pool: Cell::new(pool), flags: Cell::new(Flags::empty()), error: Cell::new(None), - lw: Cell::new(1024), - read_hw: Cell::new(8 * 1024), - write_hw: Cell::new(8 * 1024), disconnect_timeout: Cell::new(Seconds(1)), dispatch_task: LocalWaker::new(), read_task: LocalWaker::new(), @@ -202,13 +158,16 @@ impl State { #[inline] /// Create `State` from Framed pub fn from_framed(framed: Framed) -> (Io, U, Self) { - let parts = framed.into_parts(); + let pool = PoolId::DEFAULT.pool_ref(); + let mut parts = framed.into_parts(); let read_buf = if !parts.read_buf.is_empty() { + pool.move_in(&mut parts.read_buf); Cell::new(Some(parts.read_buf)) } else { Cell::new(None) }; let write_buf = if !parts.write_buf.is_empty() { + pool.move_in(&mut parts.write_buf); Cell::new(Some(parts.write_buf)) } else { Cell::new(None) @@ -217,11 +176,9 @@ impl State { let state = State(Rc::new(IoStateInner { read_buf, write_buf, + pool: Cell::new(pool), flags: Cell::new(Flags::empty()), error: Cell::new(None), - lw: Cell::new(1024), - read_hw: Cell::new(8 * 1024), - write_hw: Cell::new(8 * 1024), disconnect_timeout: Cell::new(Seconds(1)), dispatch_task: LocalWaker::new(), read_task: LocalWaker::new(), @@ -231,20 +188,19 @@ impl State { (parts.io, parts.codec, state) } + #[doc(hidden)] #[inline] /// Create `State` instance with custom params pub fn with_params( - max_read_buf_size: u16, - max_write_buf_size: u16, - min_buf_size: u16, + _max_read_buf_size: u16, + _max_write_buf_size: u16, + _min_buf_size: u16, disconnect_timeout: Seconds, ) -> Self { State(Rc::new(IoStateInner { + pool: Cell::new(PoolId::DEFAULT.pool_ref()), flags: Cell::new(Flags::empty()), error: Cell::new(None), - lw: Cell::new(min_buf_size), - read_hw: Cell::new(max_read_buf_size), - write_hw: Cell::new(max_write_buf_size), disconnect_timeout: Cell::new(disconnect_timeout), dispatch_task: LocalWaker::new(), read_buf: Cell::new(None), @@ -275,10 +231,8 @@ impl State { pub(crate) fn keepalive_timeout(&self) { let state = self.0.as_ref(); - let mut flags = state.flags.get(); - flags.insert(Flags::DSP_KEEPALIVE); - state.flags.set(flags); state.dispatch_task.wake(); + state.insert_flags(Flags::DSP_KEEPALIVE); } pub(super) fn get_disconnect_timeout(&self) -> Seconds { @@ -304,19 +258,38 @@ impl State { self.0.flags.get() } + #[inline] + /// Get memory pool + pub fn memory_pool(&self) -> PoolRef { + self.0.pool.get() + } + + #[inline] + /// Set memory pool + pub fn set_memory_pool(&self, pool: PoolRef) { + if let Some(mut buf) = self.0.read_buf.take() { + pool.move_in(&mut buf); + self.0.read_buf.set(Some(buf)); + } + if let Some(mut buf) = self.0.write_buf.take() { + pool.move_in(&mut buf); + self.0.write_buf.set(Some(buf)); + } + self.0.pool.set(pool) + } + + #[doc(hidden)] + #[deprecated(since = "0.4.11", note = "Use memory pool config")] #[inline] /// Set read/write buffer sizes /// /// By default read max buf size is 8kb, write max buf size is 8kb pub fn set_buffer_params( &self, - max_read_buf_size: u16, - max_write_buf_size: u16, - min_buf_size: u16, + _max_read_buf_size: u16, + _max_write_buf_size: u16, + _min_buf_size: u16, ) { - self.0.read_hw.set(max_read_buf_size); - self.0.write_hw.set(max_write_buf_size); - self.0.lw.set(min_buf_size); } #[inline] @@ -622,7 +595,7 @@ impl State { T: AsyncRead + AsyncWrite + Unpin, { let inner = self.0.as_ref(); - let lw = inner.lw.get() as usize; + let (hw, lw) = inner.pool.get().read_params().unpack(); let mut buf = inner.get_read_buf(); // read data from socket @@ -631,7 +604,7 @@ impl State { // make sure we've got room let remaining = buf.capacity() - buf.len(); if remaining < lw { - buf.reserve((inner.read_hw.get() as usize) - remaining); + buf.reserve(hw - remaining); } match crate::codec::poll_read_buf(Pin::new(&mut *io), cx, &mut buf) { @@ -643,7 +616,7 @@ impl State { self.set_io_error(None); return false; } else { - if buf.len() > inner.read_hw.get() as usize { + if buf.len() > hw { log::trace!( "buffer is too large {}, enable read back-pressure", buf.len() @@ -736,7 +709,7 @@ impl State { } // if write buffer is smaller than high watermark value, turn off back-pressure - if buf.len() < self.0.write_hw.get() as usize { + if buf.len() < self.0.pool.get().write_params_high() { let mut flags = self.0.flags.get(); if flags.contains(Flags::WR_BACKPRESSURE) { flags.remove(Flags::WR_BACKPRESSURE); @@ -783,7 +756,8 @@ impl<'a> Write<'a> { /// Check if write buffer is full pub fn is_full(&self) -> bool { if let Some(buf) = self.0.read_buf.take() { - let result = buf.len() >= self.0.write_hw.get() as usize; + let hw = self.0.pool.get().write_params_high(); + let result = buf.len() >= hw; self.0.write_buf.set(Some(buf)); result } else { @@ -841,11 +815,12 @@ impl<'a> Write<'a> { if !flags.intersects(Flags::IO_ERR | Flags::IO_SHUTDOWN) { let mut buf = self.0.get_write_buf(); let is_write_sleep = buf.is_empty(); + let (hw, lw) = self.0.pool.get().write_params().unpack(); // make sure we've got room let remaining = buf.capacity() - buf.len(); - if remaining < self.0.lw.get() as usize { - buf.reserve((self.0.write_hw.get() as usize) - remaining); + if remaining < lw { + buf.reserve(hw - remaining); } // encode item and wake write task @@ -853,7 +828,7 @@ impl<'a> Write<'a> { if is_write_sleep { self.0.write_task.wake(); } - buf.len() < self.0.write_hw.get() as usize + buf.len() < hw }); self.0.write_buf.set(Some(buf)); result @@ -879,11 +854,12 @@ impl<'a> Write<'a> { Ok(Some(item)) => { let mut buf = self.0.get_write_buf(); let is_write_sleep = buf.is_empty(); + let (hw, lw) = self.0.pool.get().write_params().unpack(); // make sure we've got room let remaining = buf.capacity() - buf.len(); - if remaining < self.0.lw.get() as usize { - buf.reserve((self.0.write_hw.get() as usize) - remaining); + if remaining < lw { + buf.reserve(hw - remaining); } // encode item @@ -896,7 +872,7 @@ impl<'a> Write<'a> { } else if is_write_sleep { self.0.write_task.wake(); } - let result = Ok(buf.len() < self.0.write_hw.get() as usize); + let result = Ok(buf.len() < hw); self.0.write_buf.set(Some(buf)); result } @@ -927,7 +903,7 @@ impl<'a> Read<'a> { /// Check if read buffer is full pub fn is_full(&self) -> bool { if let Some(buf) = self.0.read_buf.take() { - let result = buf.len() >= self.0.read_hw.get() as usize; + let result = buf.len() >= self.0.pool.get().read_params_high(); self.0.read_buf.set(Some(buf)); result } else { @@ -983,13 +959,10 @@ impl<'a> Read<'a> { where U: Decoder, { - if let Some(mut buf) = self.0.read_buf.take() { - let result = codec.decode(&mut buf); - self.0.release_read_buf(buf); - result - } else { - codec.decode(&mut BytesMut::new()) - } + let mut buf = self.0.get_read_buf(); + let result = codec.decode(&mut buf); + self.0.release_read_buf(buf); + result } /// Get mut access to read buffer @@ -997,13 +970,10 @@ impl<'a> Read<'a> { where F: FnOnce(&mut BytesMut) -> R, { - if let Some(mut buf) = self.0.read_buf.take() { - let res = f(&mut buf); - self.0.release_read_buf(buf); - res - } else { - f(&mut BytesMut::new()) - } + let mut buf = self.0.get_read_buf(); + let res = f(&mut buf); + self.0.release_read_buf(buf); + res } } diff --git a/ntex/src/http/config.rs b/ntex/src/http/config.rs index cc8dcc4c..44e24a35 100644 --- a/ntex/src/http/config.rs +++ b/ntex/src/http/config.rs @@ -4,7 +4,7 @@ use crate::framed::Timer; use crate::http::{Request, Response}; use crate::service::boxed::BoxService; use crate::time::{sleep, Millis, Seconds, Sleep}; -use crate::util::BytesMut; +use crate::util::{BytesMut, PoolRef}; #[derive(Debug, PartialEq, Clone, Copy)] /// Server keep-alive setting @@ -87,6 +87,10 @@ impl ServiceConfig { }; let keep_alive = if ka_enabled { keep_alive } else { Millis::ZERO }; + PoolRef::default() + .set_read_params(read_hw, lw) + .set_write_params(write_hw, lw); + ServiceConfig(Rc::new(Inner { keep_alive, ka_enabled, diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index ebfbe1c5..cfeafc8a 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -992,7 +992,9 @@ mod tests { let mut h1 = h1(server, |_| { Box::pin(async { Ok::<_, io::Error>(Response::Ok().finish()) }) }); - h1.inner.state.set_buffer_params(15 * 1024, 15 * 1024, 1024); + crate::util::PoolRef::default() + .set_read_params(15 * 1024, 1024) + .set_write_params(15 * 1024, 1024); let mut decoder = ClientCodec::default(); diff --git a/ntex/src/util/mod.rs b/ntex/src/util/mod.rs index 33c0b0af..d989a98b 100644 --- a/ntex/src/util/mod.rs +++ b/ntex/src/util/mod.rs @@ -10,7 +10,7 @@ pub mod variant; pub use self::extensions::Extensions; -pub use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut}; +pub use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut, Pool, PoolId, PoolRef}; pub use ntex_util::future::*; pub type HashMap = std::collections::HashMap; diff --git a/ntex/src/ws/frame.rs b/ntex/src/ws/frame.rs index ad5e395a..9f1028ac 100644 --- a/ntex/src/ws/frame.rs +++ b/ntex/src/ws/frame.rs @@ -242,9 +242,7 @@ mod tests { Ok(Some((finished, opcode, payload))) => F { finished, opcode, - payload: payload - .map(|b| b.freeze()) - .unwrap_or_else(|| Bytes::from("")), + payload: payload.map(|b| b.freeze()).unwrap_or_else(Bytes::new), }, _ => unreachable!("error"), } diff --git a/ntex/src/ws/stream.rs b/ntex/src/ws/stream.rs index 62c1308b..3d9a482a 100644 --- a/ntex/src/ws/stream.rs +++ b/ntex/src/ws/stream.rs @@ -64,7 +64,7 @@ where loop { if !this.buf.is_empty() { - match this.codec.decode(&mut this.buf) { + match this.codec.decode(this.buf) { Ok(Some(item)) => return Poll::Ready(Some(Ok(item))), Ok(None) => (), Err(err) => return Poll::Ready(Some(Err(err.into()))), diff --git a/ntex/tests/http_awc_rustls_client.rs b/ntex/tests/http_awc_rustls_client.rs index 057eef40..b1e4f286 100644 --- a/ntex/tests/http_awc_rustls_client.rs +++ b/ntex/tests/http_awc_rustls_client.rs @@ -49,7 +49,7 @@ mod danger { _scts: &mut dyn Iterator, _ocsp_response: &[u8], _now: SystemTime, - ) -> Result { + ) -> Result { Ok(rust_tls::client::ServerCertVerified::assertion()) } }