From a2a5899bbe74dc27959f85374ba36ea7eef97ba6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 29 Jan 2022 23:53:13 +0600 Subject: [PATCH] Add BytesVec type --- ntex-bytes/CHANGELOG.md | 4 + ntex-bytes/Cargo.toml | 2 +- ntex-bytes/src/bytes.rs | 1304 +++++++++++++++++++++++++++++++- ntex-bytes/src/lib.rs | 4 +- ntex-bytes/src/pool.rs | 23 +- ntex-bytes/tests/test_bytes.rs | 238 +++++- ntex-service/src/pipeline.rs | 56 -- 7 files changed, 1524 insertions(+), 107 deletions(-) diff --git a/ntex-bytes/CHANGELOG.md b/ntex-bytes/CHANGELOG.md index b198e02b..cf2786b5 100644 --- a/ntex-bytes/CHANGELOG.md +++ b/ntex-bytes/CHANGELOG.md @@ -1,5 +1,9 @@ # Changes +## [0.1.11] (2022-01-30) + +* Add BytesVec type + ## [0.1.10] (2022-01-26) * Rename Pool::is_pending() to is_ready() diff --git a/ntex-bytes/Cargo.toml b/ntex-bytes/Cargo.toml index 8530bb9a..d93d2082 100644 --- a/ntex-bytes/Cargo.toml +++ b/ntex-bytes/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-bytes" -version = "0.1.10" +version = "0.1.11" license = "MIT" authors = ["Nikolay Kim ", "Carl Lerche "] description = "Types and traits for working with bytes (bytes crate fork)" diff --git a/ntex-bytes/src/bytes.rs b/ntex-bytes/src/bytes.rs index e1e3fb6e..26957e83 100644 --- a/ntex-bytes/src/bytes.rs +++ b/ntex-bytes/src/bytes.rs @@ -151,6 +151,52 @@ pub struct BytesMut { inner: Inner, } +/// A unique reference to a contiguous slice of memory. +/// +/// `BytesVec` represents a unique view into a potentially shared memory region. +/// Given the uniqueness guarantee, owners of `BytesVec` handles are able to +/// mutate the memory. It is similar to a `Vec` but with less copies and +/// allocations. It also always allocates. +/// +/// For more detail, see [Bytes](struct.Bytes.html). +/// +/// # Growth +/// +/// One key difference from `Vec` is that most operations **do not +/// implicitly grow the buffer**. This means that calling `my_bytes.put("hello +/// world");` could panic if `my_bytes` does not have enough capacity. Before +/// writing to the buffer, ensure that there is enough remaining capacity by +/// calling `my_bytes.remaining_mut()`. In general, avoiding calls to `reserve` +/// is preferable. +/// +/// The only exception is `extend` which implicitly reserves required capacity. +/// +/// # Examples +/// +/// ``` +/// use ntex_bytes::{BytesVec, BufMut}; +/// +/// let mut buf = BytesVec::with_capacity(64); +/// +/// buf.put_u8(b'h'); +/// buf.put_u8(b'e'); +/// buf.put("llo"); +/// +/// assert_eq!(&buf[..], b"hello"); +/// +/// // Freeze the buffer so that it can be shared +/// let a = buf.freeze(); +/// +/// // This does not allocate, instead `b` points to the same memory. +/// let b = a.clone(); +/// +/// assert_eq!(&a[..], b"hello"); +/// assert_eq!(&b[..], b"hello"); +/// ``` +pub struct BytesVec { + inner: InnerVec, +} + // Both `Bytes` and `BytesMut` are backed by `Inner` and functions are delegated // to `Inner` functions. The `Bytes` and `BytesMut` shims ensure that functions // that mutate the underlying buffer are only performed when the data range @@ -331,6 +377,8 @@ struct Shared { struct SharedVec { cap: usize, + len: u32, + offset: u32, ref_count: AtomicUsize, pool: PoolRef, } @@ -716,7 +764,7 @@ impl Bytes { } } else { Bytes { - inner: BytesMut::copy_from_slice_in_priv(self, self.inner.pool()).inner, + inner: BytesMut::copy_from_slice_in(self, self.inner.pool()).inner, } }; } @@ -930,6 +978,21 @@ impl FromIterator for BytesMut { } } +impl FromIterator for BytesVec { + fn from_iter>(into_iter: T) -> Self { + let iter = into_iter.into_iter(); + let (min, maybe_max) = iter.size_hint(); + + let mut out = BytesVec::with_capacity(maybe_max.unwrap_or(min)); + for i in iter { + out.reserve(1); + out.put_u8(i); + } + + out + } +} + impl FromIterator for Bytes { fn from_iter>(into_iter: T) -> Self { BytesMut::from_iter(into_iter).freeze() @@ -942,6 +1005,12 @@ impl<'a> FromIterator<&'a u8> for BytesMut { } } +impl<'a> FromIterator<&'a u8> for BytesVec { + fn from_iter>(into_iter: T) -> Self { + into_iter.into_iter().copied().collect::() + } +} + impl<'a> FromIterator<&'a u8> for Bytes { fn from_iter>(into_iter: T) -> Self { BytesMut::from_iter(into_iter).freeze() @@ -1094,15 +1163,9 @@ impl BytesMut { where PoolRef: From, { - let mut bytes = BytesMut::with_capacity_in(src.len(), pool); - bytes.extend_from_slice(src); - bytes - } - - fn copy_from_slice_in_priv(src: &[u8], pool: PoolRef) -> Self { - let mut bytes = BytesMut::with_capacity_in_priv(src.len(), pool); - bytes.extend_from_slice(src); - bytes + BytesMut { + inner: Inner::from_slice(src.len(), src, pool.into()), + } } #[inline] @@ -1178,7 +1241,7 @@ impl BytesMut { /// use ntex_bytes::BytesMut; /// /// let b = BytesMut::with_capacity(64); - /// assert_eq!(b.capacity(), 64); + /// assert_eq!(b.capacity(), 96); /// ``` #[inline] pub fn capacity(&self) -> usize { @@ -1273,7 +1336,7 @@ impl BytesMut { /// let other = buf.split(); /// /// assert!(buf.is_empty()); - /// assert_eq!(1013, buf.capacity()); + /// assert_eq!(1045, buf.capacity()); /// /// assert_eq!(other, b"hello world"[..]); /// ``` @@ -1465,12 +1528,12 @@ impl BytesMut { /// let other = buf.split(); /// /// assert!(buf.is_empty()); - /// assert_eq!(buf.capacity(), 64); + /// assert_eq!(buf.capacity(), 96); /// /// drop(other); /// buf.reserve(128); /// - /// assert_eq!(buf.capacity(), 128); + /// assert_eq!(buf.capacity(), 160); /// assert_eq!(buf.as_ptr(), ptr); /// ``` /// @@ -1685,9 +1748,8 @@ impl<'a> From<&'a str> for BytesMut { impl From for BytesMut { #[inline] fn from(src: Bytes) -> BytesMut { - src.try_mut().unwrap_or_else(|src| { - BytesMut::copy_from_slice_in_priv(&src[..], src.inner.pool()) - }) + src.try_mut() + .unwrap_or_else(|src| BytesMut::copy_from_slice_in(&src[..], src.inner.pool())) } } @@ -1821,6 +1883,1027 @@ impl<'a> Extend<&'a u8> for BytesMut { } } +/* + * + * ===== BytesVec ===== + * + */ + +impl BytesVec { + /// Creates a new `BytesVec` with the specified capacity. + /// + /// The returned `BytesVec` will be able to hold at least `capacity` bytes + /// without reallocating. + /// + /// It is important to note that this function does not specify the length + /// of the returned `BytesVec`, but only the capacity. + /// + /// # Panics + /// + /// Panics if `capacity` greater than 60bit for 64bit systems + /// and 28bit for 32bit systems + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::{BytesVec, BufMut}; + /// + /// let mut bytes = BytesVec::with_capacity(64); + /// + /// // `bytes` contains no data, even though there is capacity + /// assert_eq!(bytes.len(), 0); + /// + /// bytes.put(&b"hello world"[..]); + /// + /// assert_eq!(&bytes[..], b"hello world"); + /// ``` + #[inline] + pub fn with_capacity(capacity: usize) -> BytesVec { + Self::with_capacity_in(capacity, PoolId::DEFAULT.pool_ref()) + } + + /// Creates a new `BytesVec` with the specified capacity and in specified memory pool. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::{BytesVec, BufMut, PoolId}; + /// + /// let mut bytes = BytesVec::with_capacity_in(64, PoolId::P1); + /// + /// // `bytes` contains no data, even though there is capacity + /// assert_eq!(bytes.len(), 0); + /// + /// bytes.put(&b"hello world"[..]); + /// + /// assert_eq!(&bytes[..], b"hello world"); + /// assert!(PoolId::P1.pool_ref().allocated() > 0); + /// ``` + #[inline] + pub fn with_capacity_in(capacity: usize, pool: T) -> BytesVec + where + PoolRef: From, + { + BytesVec { + inner: InnerVec::with_capacity(capacity, pool.into()), + } + } + + #[inline] + pub(crate) fn with_capacity_in_priv(capacity: usize, pool: PoolRef) -> BytesVec { + BytesVec { + inner: InnerVec::with_capacity(capacity, pool), + } + } + + /// Creates a new `BytesVec` from slice, by copying it. + pub fn copy_from_slice(src: &[u8]) -> Self { + Self::copy_from_slice_in(src, PoolId::DEFAULT) + } + + /// Creates a new `BytesVec` from slice, by copying it. + pub fn copy_from_slice_in(src: &[u8], pool: T) -> Self + where + PoolRef: From, + { + BytesVec { + inner: InnerVec::from_slice(src.len(), src, pool.into()), + } + } + + /// Creates a new `BytesVec` with default capacity. + /// + /// Resulting object has length 0 and unspecified capacity. + /// This function does not allocate. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::{BytesVec, BufMut}; + /// + /// let mut bytes = BytesVec::new(); + /// + /// assert_eq!(0, bytes.len()); + /// + /// bytes.reserve(2); + /// bytes.put_slice(b"xy"); + /// + /// assert_eq!(&b"xy"[..], &bytes[..]); + /// ``` + #[inline] + pub fn new() -> BytesVec { + BytesVec::with_capacity(MIN_NON_ZERO_CAP) + } + + /// Returns the number of bytes contained in this `BytesVec`. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::BytesVec; + /// + /// let b = BytesVec::copy_from_slice(&b"hello"[..]); + /// assert_eq!(b.len(), 5); + /// ``` + #[inline] + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns true if the `BytesVec` has a length of 0. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::BytesVec; + /// + /// let b = BytesVec::with_capacity(64); + /// assert!(b.is_empty()); + /// ``` + #[inline] + pub fn is_empty(&self) -> bool { + self.inner.len() == 0 + } + + /// Returns the number of bytes the `BytesVec` can hold without reallocating. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::BytesVec; + /// + /// let b = BytesVec::with_capacity(64); + /// assert_eq!(b.capacity(), 96); + /// ``` + #[inline] + pub fn capacity(&self) -> usize { + self.inner.capacity() + } + + /// Converts `self` into an immutable `Bytes`. + /// + /// The conversion is zero cost and is used to indicate that the slice + /// referenced by the handle will no longer be mutated. Once the conversion + /// is done, the handle can be cloned and shared across threads. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::{BytesVec, BufMut}; + /// use std::thread; + /// + /// let mut b = BytesVec::with_capacity(64); + /// b.put("hello world"); + /// let b1 = b.freeze(); + /// let b2 = b1.clone(); + /// + /// let th = thread::spawn(move || { + /// assert_eq!(&b1[..], b"hello world"); + /// }); + /// + /// assert_eq!(&b2[..], b"hello world"); + /// th.join().unwrap(); + /// ``` + #[inline] + pub fn freeze(self) -> Bytes { + Bytes { + inner: self.inner.into_inner(), + } + } + + /// Removes the bytes from the current view, returning them in a new + /// `Bytes` instance. + /// + /// Afterwards, `self` will be empty, but will retain any additional + /// capacity that it had before the operation. This is identical to + /// `self.split_to(self.len())`. + /// + /// This is an `O(1)` operation that just increases the reference count and + /// sets a few indices. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::{BytesVec, BufMut}; + /// + /// let mut buf = BytesVec::with_capacity(1024); + /// buf.put(&b"hello world"[..]); + /// + /// let other = buf.split(); + /// + /// assert!(buf.is_empty()); + /// assert_eq!(1045, buf.capacity()); + /// + /// assert_eq!(other, b"hello world"[..]); + /// ``` + pub fn split(&mut self) -> Bytes { + // let len = self.len(); + self.split_to(self.len()) + } + + /// Splits the buffer into two at the given index. + /// + /// Afterwards `self` contains elements `[at, len)`, and the returned `Bytes` + /// contains elements `[0, at)`. + /// + /// This is an `O(1)` operation that just increases the reference count and + /// sets a few indices. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::BytesVec; + /// + /// let mut a = BytesVec::copy_from_slice(&b"hello world"[..]); + /// let mut b = a.split_to(5); + /// + /// a[0] = b'!'; + /// + /// assert_eq!(&a[..], b"!world"); + /// assert_eq!(&b[..], b"hello"); + /// ``` + /// + /// # Panics + /// + /// Panics if `at > len`. + pub fn split_to(&mut self, at: usize) -> Bytes { + assert!(at <= self.len()); + + Bytes { + inner: self.inner.split_to(at, false), + } + } + + /// Shortens the buffer, keeping the first `len` bytes and dropping the + /// rest. + /// + /// If `len` is greater than the buffer's current length, this has no + /// effect. + /// + /// The [`split_off`] method can emulate `truncate`, but this causes the + /// excess bytes to be returned instead of dropped. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::BytesVec; + /// + /// let mut buf = BytesVec::copy_from_slice(&b"hello world"[..]); + /// buf.truncate(5); + /// assert_eq!(buf, b"hello"[..]); + /// ``` + /// + /// [`split_off`]: #method.split_off + pub fn truncate(&mut self, len: usize) { + self.inner.truncate(len); + } + + /// Clears the buffer, removing all data. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::BytesVec; + /// + /// let mut buf = BytesVec::copy_from_slice(&b"hello world"[..]); + /// buf.clear(); + /// assert!(buf.is_empty()); + /// ``` + pub fn clear(&mut self) { + self.truncate(0); + } + + /// Resizes the buffer so that `len` is equal to `new_len`. + /// + /// If `new_len` is greater than `len`, the buffer is extended by the + /// difference with each additional byte set to `value`. If `new_len` is + /// less than `len`, the buffer is simply truncated. + /// + /// # Panics + /// + /// Panics if `new_len` greater than 60bit for 64bit systems + /// and 28bit for 32bit systems + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::BytesVec; + /// + /// let mut buf = BytesVec::new(); + /// + /// buf.resize(3, 0x1); + /// assert_eq!(&buf[..], &[0x1, 0x1, 0x1]); + /// + /// buf.resize(2, 0x2); + /// assert_eq!(&buf[..], &[0x1, 0x1]); + /// + /// buf.resize(4, 0x3); + /// assert_eq!(&buf[..], &[0x1, 0x1, 0x3, 0x3]); + /// ``` + #[inline] + pub fn resize(&mut self, new_len: usize, value: u8) { + self.inner.resize(new_len, value); + } + + /// Sets the length of the buffer. + /// + /// This will explicitly set the size of the buffer without actually + /// modifying the data, so it is up to the caller to ensure that the data + /// has been initialized. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::BytesVec; + /// + /// let mut b = BytesVec::copy_from_slice(&b"hello world"[..]); + /// + /// unsafe { + /// b.set_len(5); + /// } + /// + /// assert_eq!(&b[..], b"hello"); + /// + /// unsafe { + /// b.set_len(11); + /// } + /// + /// assert_eq!(&b[..], b"hello world"); + /// ``` + /// + /// # Panics + /// + /// This method will panic if `len` is out of bounds for the underlying + /// slice or if it comes after the `end` of the configured window. + #[inline] + #[allow(clippy::missing_safety_doc)] + pub unsafe fn set_len(&mut self, len: usize) { + self.inner.set_len(len) + } + + /// Reserves capacity for at least `additional` more bytes to be inserted + /// into the given `BytesVec`. + /// + /// More than `additional` bytes may be reserved in order to avoid frequent + /// reallocations. A call to `reserve` may result in an allocation. + /// + /// Before allocating new buffer space, the function will attempt to reclaim + /// space in the existing buffer. If the current handle references a small + /// view in the original buffer and all other handles have been dropped, + /// and the requested capacity is less than or equal to the existing + /// buffer's capacity, then the current view will be copied to the front of + /// the buffer and the handle will take ownership of the full buffer. + /// + /// # Panics + /// + /// Panics if new capacity is greater than 60bit for 64bit systems + /// and 28bit for 32bit systems + /// + /// # Examples + /// + /// In the following example, a new buffer is allocated. + /// + /// ``` + /// use ntex_bytes::BytesVec; + /// + /// let mut buf = BytesVec::copy_from_slice(&b"hello"[..]); + /// buf.reserve(64); + /// assert!(buf.capacity() >= 69); + /// ``` + /// + /// In the following example, the existing buffer is reclaimed. + /// + /// ``` + /// use ntex_bytes::{BytesVec, BufMut}; + /// + /// let mut buf = BytesVec::with_capacity(128); + /// buf.put(&[0; 64][..]); + /// + /// let ptr = buf.as_ptr(); + /// let other = buf.split(); + /// + /// assert!(buf.is_empty()); + /// assert_eq!(buf.capacity(), 96); + /// + /// drop(other); + /// buf.reserve(128); + /// + /// assert_eq!(buf.capacity(), 160); + /// assert_eq!(buf.as_ptr(), ptr); + /// ``` + /// + /// # Panics + /// + /// Panics if the new capacity overflows `usize`. + #[inline] + pub fn reserve(&mut self, additional: usize) { + let len = self.len(); + let rem = self.capacity() - len; + + if additional <= rem { + // The handle can already store at least `additional` more bytes, so + // there is no further work needed to be done. + return; + } + + self.inner.reserve_inner(additional); + } + + /// Appends given bytes to this object. + /// + /// If this `BytesVec` object has not enough capacity, it is resized first. + /// So unlike `put_slice` operation, `extend_from_slice` does not panic. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::BytesVec; + /// + /// let mut buf = BytesVec::with_capacity(0); + /// buf.extend_from_slice(b"aaabbb"); + /// buf.extend_from_slice(b"cccddd"); + /// + /// assert_eq!(b"aaabbbcccddd", &buf[..]); + /// ``` + #[inline] + pub fn extend_from_slice(&mut self, extend: &[u8]) { + self.put_slice(extend); + } + + /// Run provided function with `BytesMut` instance that contains current data. + #[inline] + pub fn with_bytes_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut BytesMut) -> R, + { + self.inner.with_bytes_mut(f) + } + + /// Returns an iterator over the bytes contained by the buffer. + /// + /// # Examples + /// + /// ``` + /// use ntex_bytes::{Buf, BytesVec}; + /// + /// let buf = BytesVec::copy_from_slice(&b"abc"[..]); + /// let mut iter = buf.iter(); + /// + /// assert_eq!(iter.next().map(|b| *b), Some(b'a')); + /// assert_eq!(iter.next().map(|b| *b), Some(b'b')); + /// assert_eq!(iter.next().map(|b| *b), Some(b'c')); + /// assert_eq!(iter.next(), None); + /// ``` + #[inline] + pub fn iter(&'_ self) -> std::slice::Iter<'_, u8> { + self.chunk().iter() + } + + pub(crate) fn move_to_pool(&mut self, pool: PoolRef) { + self.inner.move_to_pool(pool); + } +} + +impl Buf for BytesVec { + #[inline] + fn remaining(&self) -> usize { + self.len() + } + + #[inline] + fn chunk(&self) -> &[u8] { + self.inner.as_ref() + } + + #[inline] + fn advance(&mut self, cnt: usize) { + assert!( + cnt <= self.inner.as_ref().len(), + "cannot advance past `remaining`" + ); + unsafe { + self.inner.set_start(cnt as u32); + } + } +} + +impl BufMut for BytesVec { + #[inline] + fn remaining_mut(&self) -> usize { + self.capacity() - self.len() + } + + #[inline] + unsafe fn advance_mut(&mut self, cnt: usize) { + let new_len = self.len() + cnt; + + // This call will panic if `cnt` is too big + self.inner.set_len(new_len); + } + + #[inline] + fn chunk_mut(&mut self) -> &mut UninitSlice { + let len = self.len(); + + unsafe { + // This will never panic as `len` can never become invalid + let ptr = &mut self.inner.as_raw()[len..]; + + UninitSlice::from_raw_parts_mut(ptr.as_mut_ptr(), self.capacity() - len) + } + } + + #[inline] + fn put_slice(&mut self, src: &[u8]) { + let len = src.len(); + self.reserve(len); + + unsafe { + ptr::copy_nonoverlapping( + src.as_ptr(), + self.chunk_mut().as_mut_ptr() as *mut u8, + len, + ); + self.advance_mut(len); + } + } + + #[inline] + fn put_u8(&mut self, n: u8) { + self.reserve(1); + self.inner.put_u8(n); + } + + #[inline] + fn put_i8(&mut self, n: i8) { + self.reserve(1); + self.put_u8(n as u8); + } +} + +impl AsRef<[u8]> for BytesVec { + #[inline] + fn as_ref(&self) -> &[u8] { + self.inner.as_ref() + } +} + +impl AsMut<[u8]> for BytesVec { + #[inline] + fn as_mut(&mut self) -> &mut [u8] { + self.inner.as_mut() + } +} + +impl Deref for BytesVec { + type Target = [u8]; + + #[inline] + fn deref(&self) -> &[u8] { + self.as_ref() + } +} + +impl DerefMut for BytesVec { + #[inline] + fn deref_mut(&mut self) -> &mut [u8] { + self.inner.as_mut() + } +} + +impl Eq for BytesVec {} + +impl PartialEq for BytesVec { + #[inline] + fn eq(&self, other: &BytesVec) -> bool { + unsafe { ptr::eq(self.inner.as_ptr(), other.inner.as_ptr()) } + } +} + +impl Ord for BytesVec { + #[inline] + fn cmp(&self, other: &BytesVec) -> cmp::Ordering { + self.inner.as_ref().cmp(other.inner.as_ref()) + } +} + +impl PartialOrd for BytesVec { + #[inline] + fn partial_cmp(&self, other: &BytesVec) -> Option { + self.inner.as_ref().partial_cmp(other.inner.as_ref()) + } +} + +impl Default for BytesVec { + #[inline] + fn default() -> BytesVec { + BytesVec::new() + } +} + +impl fmt::Debug for BytesVec { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&debug::BsDebug(self.inner.as_ref()), fmt) + } +} + +impl hash::Hash for BytesVec { + #[inline] + fn hash(&self, state: &mut H) + where + H: hash::Hasher, + { + let s: &[u8] = self.as_ref(); + s.hash(state); + } +} + +impl Borrow<[u8]> for BytesVec { + #[inline] + fn borrow(&self) -> &[u8] { + self.as_ref() + } +} + +impl BorrowMut<[u8]> for BytesVec { + #[inline] + fn borrow_mut(&mut self) -> &mut [u8] { + self.as_mut() + } +} + +impl fmt::Write for BytesVec { + #[inline] + fn write_str(&mut self, s: &str) -> fmt::Result { + if self.remaining_mut() >= s.len() { + self.put_slice(s.as_bytes()); + Ok(()) + } else { + Err(fmt::Error) + } + } + + #[inline] + fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result { + fmt::write(self, args) + } +} + +impl<'a> IntoIterator for &'a BytesVec { + type Item = &'a u8; + type IntoIter = std::slice::Iter<'a, u8>; + + fn into_iter(self) -> Self::IntoIter { + self.as_ref().iter() + } +} + +impl Extend for BytesVec { + fn extend(&mut self, iter: T) + where + T: IntoIterator, + { + let iter = iter.into_iter(); + + let (lower, _) = iter.size_hint(); + self.reserve(lower); + + for b in iter { + self.put_u8(b); + } + } +} + +impl<'a> Extend<&'a u8> for BytesVec { + fn extend(&mut self, iter: T) + where + T: IntoIterator, + { + self.extend(iter.into_iter().copied()) + } +} + +struct InnerVec(NonNull); + +impl InnerVec { + #[inline] + fn with_capacity(capacity: usize, pool: PoolRef) -> InnerVec { + Self::from_slice(capacity, &[], pool) + } + + #[inline] + fn from_slice(cap: usize, src: &[u8], pool: PoolRef) -> InnerVec { + // TODO: vec must be aligned to SharedVec instead of u8 + let mut vec = Vec::::with_capacity((cap / SHARED_VEC_SIZE) + 2); + #[allow(clippy::uninit_vec)] + unsafe { + // Store data in vec + let len = src.len() as u32; + let cap = vec.capacity() * SHARED_VEC_SIZE; + let shared_ptr = vec.as_mut_ptr(); + mem::forget(vec); + pool.acquire(cap); + + let ptr = shared_ptr.add(1) as *mut u8; + if !src.is_empty() { + ptr::copy_nonoverlapping(src.as_ptr(), ptr, src.len()); + } + ptr::write( + shared_ptr, + SharedVec { + len, + cap, + pool, + ref_count: AtomicUsize::new(1), + offset: SHARED_VEC_SIZE as u32, + }, + ); + + InnerVec(NonNull::new_unchecked(shared_ptr)) + } + } + + #[inline] + fn move_to_pool(&mut self, pool: PoolRef) { + unsafe { + let inner = self.as_inner(); + if pool != inner.pool { + pool.acquire(inner.cap); + let pool = mem::replace(&mut inner.pool, pool); + pool.release(inner.cap); + } + } + } + + /// Return a slice for the handle's view into the shared buffer + #[inline] + fn as_ref(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.as_ptr(), self.len()) } + } + + /// Return a mutable slice for the handle's view into the shared buffer + #[inline] + fn as_mut(&mut self) -> &mut [u8] { + unsafe { slice::from_raw_parts_mut(self.as_ptr(), self.len()) } + } + + /// Return a mutable slice for the handle's view into the shared buffer + /// including potentially uninitialized bytes. + #[inline] + unsafe fn as_raw(&mut self) -> &mut [u8] { + slice::from_raw_parts_mut(self.as_ptr(), self.capacity()) + } + + /// Return a raw pointer to data + #[inline] + unsafe fn as_ptr(&self) -> *mut u8 { + (self.0.as_ptr() as *mut u8).add((*self.0.as_ptr()).offset as usize) + } + + #[inline] + unsafe fn as_inner(&mut self) -> &mut SharedVec { + self.0.as_mut() + } + + /// Insert a byte into the next slot and advance the len by 1. + #[inline] + fn put_u8(&mut self, n: u8) { + unsafe { + let inner = self.as_inner(); + let len = inner.len as usize; + assert!(len < (inner.cap - inner.offset as usize)); + inner.len += 1; + *self.as_ptr().add(len) = n; + } + } + + #[inline] + fn len(&self) -> usize { + unsafe { (*self.0.as_ptr()).len as usize } + } + + /// slice. + #[inline] + unsafe fn set_len(&mut self, len: usize) { + let inner = self.as_inner(); + assert!(len <= (inner.cap - inner.offset as usize) && len < u32::MAX as usize); + inner.len = len as u32; + } + + #[inline] + fn capacity(&self) -> usize { + unsafe { (*self.0.as_ptr()).cap - (*self.0.as_ptr()).offset as usize } + } + + fn into_inner(mut self) -> Inner { + unsafe { + let ptr = self.as_ptr(); + + if self.len() <= INLINE_CAP { + Inner::from_ptr_inline(ptr, self.len()) + } else { + let inner = self.as_inner(); + + let inner = Inner { + ptr, + len: inner.len as usize, + cap: inner.cap - inner.offset as usize, + arc: NonNull::new_unchecked( + (self.0.as_ptr() as usize ^ KIND_VEC) as *mut Shared, + ), + }; + mem::forget(self); + inner + } + } + } + + fn with_bytes_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut BytesMut) -> R, + { + unsafe { + // create Inner for BytesMut + let ptr = self.as_ptr(); + let inner = self.as_inner(); + let inner = Inner { + ptr, + len: inner.len as usize, + cap: inner.cap - inner.offset as usize, + arc: NonNull::new_unchecked( + (self.0.as_ptr() as usize ^ KIND_VEC) as *mut Shared, + ), + }; + + // run function + let mut buf = BytesMut { inner }; + let result = f(&mut buf); + + // convert BytesMut back to InnerVec + let kind = buf.inner.kind(); + let new_inner = + // only KIND_VEC could be converted to self, otherwise we have to copy data + if kind == KIND_INLINE || kind == KIND_STATIC || kind == KIND_ARC { + InnerVec::from_slice( + buf.inner.capacity(), + buf.inner.as_ref(), + buf.inner.pool(), + ) + } else if kind == KIND_VEC { + let ptr = buf.inner.shared_vec(); + let offset = buf.inner.arc.as_ptr() as usize - ptr as usize; + if buf.inner.cap < (*ptr).cap - offset { + InnerVec::from_slice( + buf.inner.capacity(), + buf.inner.as_ref(), + buf.inner.pool(), + ) + } else { + (*ptr).len = buf.len() as u32; + (*ptr).offset = offset as u32; + let inner = InnerVec(NonNull::new_unchecked(ptr)); + // reuse bytes + mem::forget(buf); + inner + } + } else { + panic!() + }; + + // drop old inner, we cannot drop because BytesMut used it + let old = mem::replace(self, new_inner); + mem::forget(old); + + result + } + } + + fn split_to(&mut self, at: usize, create_inline: bool) -> Inner { + let other = unsafe { + let ptr = self.as_ptr(); + + if create_inline && at <= INLINE_CAP { + Inner::from_ptr_inline(ptr, at) + } else { + let inner = self.as_inner(); + let old_size = inner.ref_count.fetch_add(1, Relaxed); + if old_size == usize::MAX { + abort(); + } + + Inner { + ptr, + len: at, + cap: at, + arc: NonNull::new_unchecked( + (self.0.as_ptr() as usize ^ KIND_VEC) as *mut Shared, + ), + } + } + }; + unsafe { + self.set_start(at as u32); + } + + other + } + + fn truncate(&mut self, len: usize) { + unsafe { + if len <= self.len() { + self.set_len(len); + } + } + } + + fn resize(&mut self, new_len: usize, value: u8) { + let len = self.len(); + if new_len > len { + let additional = new_len - len; + self.reserve(additional); + unsafe { + let dst = self.as_raw()[len..].as_mut_ptr(); + ptr::write_bytes(dst, value, additional); + self.set_len(new_len); + } + } else { + self.truncate(new_len); + } + } + + #[inline] + fn reserve(&mut self, additional: usize) { + let len = self.len(); + let rem = self.capacity() - len; + + if additional <= rem { + // The handle can already store at least `additional` more bytes, so + // there is no further work needed to be done. + return; + } + + self.reserve_inner(additional) + } + + #[inline] + // In separate function to allow the short-circuits in `reserve` to + // be inline-able. Significant helps performance. + fn reserve_inner(&mut self, additional: usize) { + let len = self.len(); + + // Reserving involves abandoning the currently shared buffer and + // allocating a new vector with the requested capacity. + let new_cap = len + additional; + + unsafe { + let inner = self.as_inner(); + let vec_cap = inner.cap - SHARED_VEC_SIZE; + + // try to reclaim the buffer. This is possible if the current + // handle is the only outstanding handle pointing to the buffer. + if inner.is_unique() && vec_cap >= new_cap { + inner.offset = SHARED_VEC_SIZE as u32; + + // The capacity is sufficient, reclaim the buffer + let ptr = (self.0.as_ptr() as *mut u8).add(SHARED_VEC_SIZE); + ptr::copy(self.as_ptr(), ptr, len); + } else { + // Create a new vector storage + let pool = inner.pool; + *self = InnerVec::from_slice(new_cap, self.as_ref(), pool); + } + } + } + + unsafe fn set_start(&mut self, start: u32) { + // Setting the start to 0 is a no-op, so return early if this is the + // case. + if start == 0 { + return; + } + + let inner = self.as_inner(); + assert!(start <= inner.cap as u32); + + // Updating the start of the view is setting `offset` to point to the + // new start and updating the `len` field to reflect the new length + // of the view. + inner.offset += start; + + if inner.len >= start { + inner.len -= start; + } else { + inner.len = 0; + } + } +} + +impl Drop for InnerVec { + fn drop(&mut self) { + release_shared_vec(self.0.as_ptr()); + } +} + /* * * ===== Inner ===== @@ -1886,27 +2969,28 @@ impl Inner { #[inline] fn from_slice(cap: usize, src: &[u8], pool: PoolRef) -> Inner { - // Store data in vec - let mut vec = Vec::with_capacity(cap + SHARED_VEC_SIZE); + // TODO: vec must be aligned to SharedVec instead of u8 + let mut vec = Vec::::with_capacity((cap / SHARED_VEC_SIZE) + 2); #[allow(clippy::uninit_vec)] unsafe { - vec.set_len(SHARED_VEC_SIZE); - vec.extend_from_slice(src); - + // Store data in vec let len = src.len(); - let full_cap = vec.capacity(); + let full_cap = vec.capacity() * SHARED_VEC_SIZE; let cap = full_cap - SHARED_VEC_SIZE; - let ptr = vec.as_mut_ptr(); + let shared_ptr = vec.as_mut_ptr(); mem::forget(vec); pool.acquire(full_cap); - let shared_vec_ptr = ptr as *mut SharedVec; + let ptr = shared_ptr.add(1) as *mut u8; + ptr::copy_nonoverlapping(src.as_ptr(), ptr, src.len()); ptr::write( - shared_vec_ptr, + shared_ptr, SharedVec { pool, cap: full_cap, ref_count: AtomicUsize::new(1), + len: 0, + offset: 0, }, ); @@ -1914,8 +2998,10 @@ impl Inner { Inner { len, cap, - ptr: ptr.add(SHARED_VEC_SIZE), - arc: NonNull::new_unchecked((ptr as usize ^ KIND_VEC) as *mut Shared), + ptr, + arc: NonNull::new_unchecked( + (shared_ptr as usize ^ KIND_VEC) as *mut Shared, + ), } } } @@ -2890,12 +3976,174 @@ impl PartialEq for Bytes { } } +impl PartialEq for Bytes { + fn eq(&self, other: &BytesVec) -> bool { + other[..] == self[..] + } +} + +impl PartialEq for BytesVec { + fn eq(&self, other: &Bytes) -> bool { + other[..] == self[..] + } +} + impl PartialEq for BytesMut { fn eq(&self, other: &Bytes) -> bool { other[..] == self[..] } } +impl PartialEq for BytesVec { + fn eq(&self, other: &BytesMut) -> bool { + other[..] == self[..] + } +} + +impl PartialEq for BytesMut { + fn eq(&self, other: &BytesVec) -> bool { + other[..] == self[..] + } +} + +impl PartialEq<[u8]> for BytesVec { + fn eq(&self, other: &[u8]) -> bool { + &**self == other + } +} + +impl PartialOrd<[u8]> for BytesVec { + fn partial_cmp(&self, other: &[u8]) -> Option { + (**self).partial_cmp(other) + } +} + +impl PartialEq for [u8] { + fn eq(&self, other: &BytesVec) -> bool { + *other == *self + } +} + +impl PartialOrd for [u8] { + fn partial_cmp(&self, other: &BytesVec) -> Option { + other.partial_cmp(self) + } +} + +impl PartialEq for BytesVec { + fn eq(&self, other: &str) -> bool { + &**self == other.as_bytes() + } +} + +impl PartialOrd for BytesVec { + fn partial_cmp(&self, other: &str) -> Option { + (**self).partial_cmp(other.as_bytes()) + } +} + +impl PartialEq for str { + fn eq(&self, other: &BytesVec) -> bool { + *other == *self + } +} + +impl PartialOrd for str { + fn partial_cmp(&self, other: &BytesVec) -> Option { + other.partial_cmp(self) + } +} + +impl PartialEq> for BytesVec { + fn eq(&self, other: &Vec) -> bool { + *self == other[..] + } +} + +impl PartialOrd> for BytesVec { + fn partial_cmp(&self, other: &Vec) -> Option { + (**self).partial_cmp(&other[..]) + } +} + +impl PartialEq for Vec { + fn eq(&self, other: &BytesVec) -> bool { + *other == *self + } +} + +impl PartialOrd for Vec { + fn partial_cmp(&self, other: &BytesVec) -> Option { + other.partial_cmp(self) + } +} + +impl PartialEq for BytesVec { + fn eq(&self, other: &String) -> bool { + *self == other[..] + } +} + +impl PartialOrd for BytesVec { + fn partial_cmp(&self, other: &String) -> Option { + (**self).partial_cmp(other.as_bytes()) + } +} + +impl PartialEq for String { + fn eq(&self, other: &BytesVec) -> bool { + *other == *self + } +} + +impl PartialOrd for String { + fn partial_cmp(&self, other: &BytesVec) -> Option { + other.partial_cmp(self) + } +} + +impl<'a, T: ?Sized> PartialEq<&'a T> for BytesVec +where + BytesVec: PartialEq, +{ + fn eq(&self, other: &&'a T) -> bool { + *self == **other + } +} + +impl<'a, T: ?Sized> PartialOrd<&'a T> for BytesVec +where + BytesVec: PartialOrd, +{ + fn partial_cmp(&self, other: &&'a T) -> Option { + self.partial_cmp(*other) + } +} + +impl PartialEq for &[u8] { + fn eq(&self, other: &BytesVec) -> bool { + *other == *self + } +} + +impl PartialOrd for &[u8] { + fn partial_cmp(&self, other: &BytesVec) -> Option { + other.partial_cmp(self) + } +} + +impl PartialEq for &str { + fn eq(&self, other: &BytesVec) -> bool { + *other == *self + } +} + +impl PartialOrd for &str { + fn partial_cmp(&self, other: &BytesVec) -> Option { + other.partial_cmp(self) + } +} + // While there is `std::process:abort`, it's only available in Rust 1.17, and // our minimum supported version is currently 1.15. So, this acts as an abort // by triggering a double panic, which always aborts in Rust. diff --git a/ntex-bytes/src/lib.rs b/ntex-bytes/src/lib.rs index 0b0ac20f..5ccdced4 100644 --- a/ntex-bytes/src/lib.rs +++ b/ntex-bytes/src/lib.rs @@ -39,7 +39,7 @@ //! let b = buf.split(); //! assert_eq!(b, b"goodbye world"[..]); //! -//! assert_eq!(buf.capacity(), 998); +//! assert_eq!(buf.capacity(), 1030); //! ``` //! //! In the above example, only a single buffer of 1024 is allocated. The handles @@ -68,7 +68,7 @@ mod pool; mod serde; mod string; -pub use crate::bytes::{Bytes, BytesMut}; +pub use crate::bytes::{Bytes, BytesMut, BytesVec}; pub use crate::string::ByteString; #[doc(hidden)] diff --git a/ntex-bytes/src/pool.rs b/ntex-bytes/src/pool.rs index 5bc0d604..fc8118cb 100644 --- a/ntex-bytes/src/pool.rs +++ b/ntex-bytes/src/pool.rs @@ -2,11 +2,11 @@ use std::sync::atomic::Ordering::{Relaxed, Release}; use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::task::{Context, Poll, Waker}; -use std::{cell::Cell, cell::RefCell, fmt, future::Future, mem, pin::Pin, rc::Rc}; +use std::{cell::Cell, cell::RefCell, fmt, future::Future, mem, pin::Pin, ptr, rc::Rc}; use futures_core::task::__internal::AtomicWaker; -use crate::BytesMut; +use crate::{BytesMut, BytesVec}; pub struct Pool { idx: Cell, @@ -190,12 +190,23 @@ impl PoolRef { buf.move_to_pool(self); } + #[inline] + pub fn move_vec_in(self, buf: &mut BytesVec) { + buf.move_to_pool(self); + } + #[inline] /// Creates a new `BytesMut` with the specified capacity. pub fn buf_with_capacity(self, cap: usize) -> BytesMut { BytesMut::with_capacity_in_priv(cap, self) } + #[inline] + /// Creates a new `BytesVec` with the specified capacity. + pub fn vec_with_capacity(self, cap: usize) -> BytesVec { + BytesVec::with_capacity_in_priv(cap, self) + } + #[doc(hidden)] #[inline] /// Set max pool size @@ -378,6 +389,14 @@ impl fmt::Debug for PoolRef { } } +impl Eq for PoolRef {} + +impl PartialEq for PoolRef { + fn eq(&self, other: &PoolRef) -> bool { + ptr::eq(&self.0, &other.0) + } +} + impl MemoryPool { fn create(id: PoolId) -> &'static MemoryPool { Box::leak(Box::new(MemoryPool { diff --git a/ntex-bytes/tests/test_bytes.rs b/ntex-bytes/tests/test_bytes.rs index cd55cbb7..b853a9ae 100644 --- a/ntex-bytes/tests/test_bytes.rs +++ b/ntex-bytes/tests/test_bytes.rs @@ -1,7 +1,7 @@ #![deny(warnings, rust_2018_idioms)] use std::task::Poll; -use ntex_bytes::{Buf, BufMut, Bytes, BytesMut, PoolId}; +use ntex_bytes::{Buf, BufMut, Bytes, BytesMut, BytesVec, PoolId}; const LONG: &[u8] = b"mary had a little lamb, little lamb, little lamb"; const SHORT: &[u8] = b"hello world"; @@ -13,7 +13,7 @@ fn inline_cap() -> usize { const fn shared_vec() -> usize { use std::mem; - 3 * mem::size_of::() + 3 * mem::size_of::() + 8 } fn is_sync() {} @@ -64,6 +64,20 @@ fn from_slice() { assert_eq!(b"abcdefgh"[..], a); assert_eq!(&b"abcdefgh"[..], a); assert_eq!(Vec::from(&b"abcdefgh"[..]), a); + + let a = BytesVec::copy_from_slice(&b"abcdefgh"[..]); + assert_eq!(a, b"abcdefgh"[..]); + assert_eq!(a, &b"abcdefgh"[..]); + assert_eq!(b"abcdefgh"[..], a); + assert_eq!(&b"abcdefgh"[..], a); + assert_eq!(Vec::from(&b"abcdefgh"[..]), a); + + let a = BytesVec::copy_from_slice_in(&b"abcdefgh"[..], PoolId::P10); + assert_eq!(a, b"abcdefgh"[..]); + assert_eq!(a, &b"abcdefgh"[..]); + assert_eq!(b"abcdefgh"[..], a); + assert_eq!(&b"abcdefgh"[..], a); + assert_eq!(Vec::from(&b"abcdefgh"[..]), a); } #[test] @@ -75,6 +89,9 @@ fn fmt() { let a = format!("{:?}", BytesMut::from(&b"abcdefg"[..])); assert_eq!(a, b); + + let a = format!("{:?}", BytesVec::copy_from_slice(&b"abcdefg"[..])); + assert_eq!(a, b); } #[test] @@ -92,7 +109,20 @@ fn fmt_write() { write!(b, "{}", &s[32..64]).unwrap(); assert_eq!(b, s[..64].as_bytes()); - let mut c = BytesMut::with_capacity(64); + let mut c = BytesMut::with_capacity(2); + write!(c, "{}", s).unwrap_err(); + assert!(c.is_empty()); + + let mut a = BytesVec::with_capacity(64); + write!(a, "{}", &s[..64]).unwrap(); + assert_eq!(a, s[..64].as_bytes()); + + let mut b = BytesVec::with_capacity(64); + write!(b, "{}", &s[..32]).unwrap(); + write!(b, "{}", &s[32..64]).unwrap(); + assert_eq!(b, s[..64].as_bytes()); + + let mut c = BytesVec::with_capacity(2); write!(c, "{}", s).unwrap_err(); assert!(c.is_empty()); } @@ -105,11 +135,17 @@ fn len() { let a = BytesMut::from(&b"abcdefg"[..]); assert_eq!(a.len(), 7); + let a = BytesVec::copy_from_slice(&b"abcdefg"[..]); + assert_eq!(a.len(), 7); + let a = Bytes::from(&b""[..]); assert!(a.is_empty()); let a = BytesMut::from(&b""[..]); assert!(a.is_empty()); + + let a = BytesVec::copy_from_slice(&b""[..]); + assert!(a.is_empty()); } #[test] @@ -141,12 +177,22 @@ fn inline() { let mut a = BytesMut::from(vec![b'*'; 35]).freeze(); a.truncate(8); assert!(a.is_inline()); + + let mut a = BytesVec::copy_from_slice(&vec![b'*'; 35]).freeze(); + let b = a.split_to(8); + assert!(b.is_inline()); } #[test] fn index() { let a = Bytes::from(&b"hello world"[..]); assert_eq!(a[0..5], *b"hello"); + + let a = BytesMut::from(&b"hello world"[..]); + assert_eq!(a[0..5], *b"hello"); + + let a = BytesVec::copy_from_slice(&b"hello world"[..]); + assert_eq!(a[0..5], *b"hello"); } #[test] @@ -217,7 +263,7 @@ fn split_off_uninitialized() { assert_eq!(bytes.capacity(), 128); assert_eq!(other.len(), 0); - assert_eq!(other.capacity(), 896); + assert_eq!(other.capacity(), 928); } #[test] @@ -285,6 +331,32 @@ fn split_to_1() { assert_eq!(LONG[30..], a); assert_eq!(LONG[..30], b); + + // bytes mut + let mut a = BytesMut::from(Vec::from(LONG)); + let b = a.split_to(4); + + assert_eq!(LONG[4..], a); + assert_eq!(LONG[..4], b); + + let mut a = BytesMut::from(Vec::from(LONG)); + let b = a.split_to(30); + + assert_eq!(LONG[30..], a); + assert_eq!(LONG[..30], b); + + // bytes vec + let mut a = BytesVec::copy_from_slice(LONG); + let b = a.split_to(4); + + assert_eq!(LONG[4..], a); + assert_eq!(LONG[..4], b); + + let mut a = BytesVec::copy_from_slice(LONG); + let b = a.split_to(30); + + assert_eq!(LONG[30..], a); + assert_eq!(LONG[..30], b); } #[test] @@ -296,6 +368,22 @@ fn split_to_2() { assert_eq!(LONG[1..], a); drop(b); + + let mut a = BytesMut::from(LONG); + assert_eq!(LONG, a); + + let b = a.split_to(1); + + assert_eq!(LONG[1..], a); + drop(b); + + let mut a = BytesVec::copy_from_slice(LONG); + assert_eq!(LONG, a); + + let b = a.split_to(1); + + assert_eq!(LONG[1..], a); + drop(b); } #[test] @@ -310,6 +398,9 @@ fn split_to_oob() { fn split_to_oob_mut() { let mut hello = BytesMut::from(&b"helloworld"[..]); hello.split_to(inline_cap() + 1); + + let mut hello = BytesVec::copy_from_slice(&b"helloworld"[..]); + hello.split_to(inline_cap() + 1); } #[test] @@ -317,6 +408,9 @@ fn split_to_oob_mut() { fn split_to_uninitialized() { let mut bytes = BytesMut::with_capacity(1024); let _other = bytes.split_to(128); + + let mut bytes = BytesVec::with_capacity(1024); + let _other = bytes.split_to(128); } #[test] @@ -353,6 +447,14 @@ fn fns_defined_for_bytes_mut() { // Iterator let v: Vec = bytes.as_ref().iter().cloned().collect(); assert_eq!(&v[..], bytes); + + let mut bytes = BytesVec::copy_from_slice(&b"hello world"[..]); + bytes.as_ptr(); + bytes.as_mut_ptr(); + + // Iterator + let v: Vec = bytes.as_ref().iter().cloned().collect(); + assert_eq!(&v[..], bytes); } #[test] @@ -360,7 +462,7 @@ fn reserve_convert() { // Vec -> Vec let mut bytes = BytesMut::from(LONG); bytes.reserve(64); - assert_eq!(bytes.capacity(), LONG.len() + 64); + assert_eq!(bytes.capacity(), LONG.len() + 80); // Arc -> Vec let mut bytes = BytesMut::from(Vec::from(LONG)); @@ -370,6 +472,11 @@ fn reserve_convert() { assert!(bytes.capacity() >= bytes.len() + 128); drop(a); + + // Vec -> Vec + let mut bytes = BytesVec::copy_from_slice(LONG); + bytes.reserve(64); + assert_eq!(bytes.capacity(), LONG.len() + 80); } // Without either looking at the internals of the BytesMut or doing weird stuff @@ -387,6 +494,17 @@ fn reserve_vec_recycling() { assert_eq!(bytes.capacity(), 16); } +#[test] +fn reserve_recycling() { + let mut bytes = BytesVec::with_capacity(16); + assert_eq!(bytes.capacity(), 32); + bytes.put("0123456789012345".as_bytes()); + bytes.advance(10); + assert_eq!(bytes.capacity(), 22); + bytes.reserve(32); + assert_eq!(bytes.capacity(), 64); +} + #[test] fn reserve_in_arc_unique_does_not_overallocate() { let mut bytes = BytesMut::with_capacity(1000); @@ -394,9 +512,9 @@ fn reserve_in_arc_unique_does_not_overallocate() { // now bytes is Arc and refcount == 1 - assert_eq!(1000, bytes.capacity()); + assert_eq!(1024, bytes.capacity()); bytes.reserve(2001); - assert_eq!(2001, bytes.capacity()); + assert_eq!(2016, bytes.capacity()); } #[test] @@ -406,9 +524,9 @@ fn reserve_in_arc_unique_doubles() { // now bytes is Arc and refcount == 1 - assert_eq!(1000, bytes.capacity()); - bytes.reserve(1001); - assert_eq!(1001, bytes.capacity()); + assert_eq!(1024, bytes.capacity()); + bytes.reserve(1025); + assert_eq!(1056, bytes.capacity()); } #[test] @@ -418,9 +536,9 @@ fn reserve_in_arc_nonunique_does_not_overallocate() { // now bytes is Arc and refcount == 2 - assert_eq!(1000, bytes.capacity()); + assert_eq!(1024, bytes.capacity()); bytes.reserve(2001); - assert_eq!(2001, bytes.capacity()); + assert_eq!(2016, bytes.capacity()); } #[test] @@ -437,6 +555,10 @@ fn extend_mut() { let mut bytes = BytesMut::with_capacity(0); bytes.extend(LONG); assert_eq!(*bytes, LONG[..]); + + let mut bytes = BytesVec::with_capacity(0); + bytes.extend(LONG); + assert_eq!(*bytes, LONG[..]); } #[test] @@ -447,6 +569,13 @@ fn extend_from_slice_mut() { bytes.extend_from_slice(&LONG[i..]); assert_eq!(LONG[..], *bytes); } + + for &i in &[3, 34] { + let mut bytes = BytesVec::new(); + bytes.extend_from_slice(&LONG[..i]); + bytes.extend_from_slice(&LONG[i..]); + assert_eq!(LONG[..], *bytes); + } } #[test] @@ -487,6 +616,20 @@ fn advance_vec() { a.advance(6); assert_eq!(a, b"d zomg wat wat"[..]); + + let mut a = BytesVec::copy_from_slice(b"hello world boooo yah world zomg wat wat"); + a.advance(16); + assert_eq!(a, b"o yah world zomg wat wat"[..]); + + a.advance(4); + assert_eq!(a, b"h world zomg wat wat"[..]); + + // Reserve some space. + a.reserve(1024); + assert_eq!(a, b"h world zomg wat wat"[..]); + + a.advance(6); + assert_eq!(a, b"d zomg wat wat"[..]); } #[test] @@ -496,6 +639,13 @@ fn advance_past_len() { a.advance(20); } +#[test] +#[should_panic] +fn advance_past_len_vec() { + let mut a = BytesVec::copy_from_slice(b"hello world"); + a.advance(20); +} + #[test] // Only run these tests on little endian systems. CI uses qemu for testing // little endian... and qemu doesn't really support threading all that well. @@ -545,6 +695,17 @@ fn partial_eq_bytesmut() { assert!(bytesmut != bytes2); } +#[test] +fn partial_eq_bytesvec() { + let bytes = Bytes::from(&b"The quick red fox"[..]); + let bytesmut = BytesVec::copy_from_slice(&b"The quick red fox"[..]); + assert!(bytes == bytesmut); + assert!(bytesmut == bytes); + let bytes2 = Bytes::from(&b"Jumped over the lazy brown dog"[..]); + assert!(bytes2 != bytesmut); + assert!(bytesmut != bytes2); +} + #[test] fn from_iter_no_size_hint() { use std::iter; @@ -619,15 +780,48 @@ fn empty_slice_ref_catches_not_an_empty_subset() { bytes.slice_ref(slice); } +#[test] +fn bytes_vec_freeze() { + let bytes = BytesVec::copy_from_slice(b"12345"); + assert_eq!(bytes, &b"12345"[..]); + let b = bytes.freeze(); + assert_eq!(b, &b"12345"[..]); + assert!(b.is_inline()); + + let bytes = BytesVec::copy_from_slice(LONG); + assert_eq!(bytes, LONG); + let b = bytes.freeze(); + assert_eq!(b, LONG); +} + +#[test] +fn bytes_vec() { + let mut bytes = BytesVec::copy_from_slice(LONG); + bytes.with_bytes_mut(|buf| { + assert_eq!(buf.split_to(4), &LONG[..4]); + }); + assert_eq!(bytes, &LONG[4..]); + + bytes.with_bytes_mut(|buf| { + assert_eq!(buf.split_off(10), &LONG[14..]); + }); + assert_eq!(bytes, &LONG[4..14]); + + bytes.with_bytes_mut(|buf| { + *buf = BytesMut::from(b"12345".to_vec()); + }); + assert_eq!(bytes, &"12345"[..]); +} + #[test] fn pool() { // Pool let p1 = PoolId::P1.pool_ref(); assert_eq!(p1.allocated(), 0); let mut buf = BytesMut::with_capacity_in(1024, p1); - assert_eq!(p1.allocated(), 1024 + shared_vec()); + assert_eq!(p1.allocated(), 1056 + shared_vec()); buf.reserve(2048); - assert_eq!(p1.allocated(), 2048 + shared_vec()); + assert_eq!(p1.allocated(), 2080 + shared_vec()); drop(buf); assert_eq!(p1.allocated(), 0); @@ -635,18 +829,26 @@ fn pool() { let p = PoolId::DEFAULT.pool_ref(); assert_eq!(p.allocated(), 0); let mut buf = BytesMut::with_capacity(1024); - assert_eq!(p.allocated(), 1024 + shared_vec()); + assert_eq!(p.allocated(), 1056 + shared_vec()); buf.reserve(2048); - assert_eq!(p.allocated(), 2048 + shared_vec()); + assert_eq!(p.allocated(), 2080 + shared_vec()); drop(buf); assert_eq!(p.allocated(), 0); let mut buf = BytesMut::with_capacity(1024); - assert_eq!(p.allocated(), 1024 + shared_vec()); + assert_eq!(p.allocated(), 1056 + shared_vec()); assert_eq!(p1.allocated(), 0); p1.move_in(&mut buf); assert_eq!(p.allocated(), 0); - assert_eq!(p1.allocated(), 1024 + shared_vec()); + assert_eq!(p1.allocated(), 1056 + shared_vec()); + + let p1 = PoolId::P2.pool_ref(); + let mut buf = BytesVec::with_capacity(1024); + assert_eq!(p.allocated(), 1056 + shared_vec()); + assert_eq!(p1.allocated(), 0); + p1.move_vec_in(&mut buf); + assert_eq!(p.allocated(), 0); + assert_eq!(p1.allocated(), 1056 + shared_vec()); } #[ntex::test] diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index 7a0db987..5b3d3248 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -1,7 +1,6 @@ use std::{marker::PhantomData, task::Context, task::Poll}; use crate::and_then::{AndThen, AndThenFactory}; -// use crate::and_then_apply_fn::{AndThenApplyFn, AndThenApplyFnFactory}; use crate::map::{Map, MapServiceFactory}; use crate::map_err::{MapErr, MapErrServiceFactory}; use crate::map_init_err::MapInitErr; @@ -61,28 +60,6 @@ impl, R> Pipeline { } } - // /// Apply function to specified service and use it as a next service in - // /// chain. - // /// - // /// Short version of `pipeline_factory(...).and_then(apply_fn_factory(...))` - // pub fn and_then_apply_fn( - // self, - // service: I, - // f: F, - // ) -> Pipeline + Clone> - // where - // Self: Sized, - // I: IntoService, - // U: Service, - // F: Fn(T::Response, &U) -> Fut, - // Fut: Future>, - // Err: From + From, - // { - // Pipeline { - // service: AndThenApplyFn::new(self.service, service.into_service(), f), - // } - // } - /// Chain on a computation for when a call to the service finished, /// passing the result of the call to the next service `U`. /// @@ -197,39 +174,6 @@ impl, R, C> PipelineFactory { } } - // /// Apply function to specified service and use it as a next service in - // /// chain. - // /// - // /// Short version of `pipeline_factory(...).and_then(apply_fn_factory(...))` - // pub fn and_then_apply_fn( - // self, - // factory: I, - // f: F, - // ) -> PipelineFactory< - // impl ServiceFactory< - // Request = T::Request, - // Response = Res, - // Error = Err, - // Config = T::Config, - // InitError = T::InitError, - // Service = impl Service - // + Clone, - // > + Clone, - // > - // where - // Self: Sized, - // T::Config: Clone, - // I: IntoServiceFactory, - // U: ServiceFactory, - // F: Fn(T::Response, &U::Service) -> Fut + Clone, - // Fut: Future>, - // Err: From + From, - // { - // PipelineFactory { - // factory: AndThenApplyFnFactory::new(self.factory, factory.into_factory(), f), - // } - // } - /// Apply transform to current service factory. /// /// Short version of `apply(transform, pipeline_factory(...))`