Add BytesVec type

This commit is contained in:
Nikolay Kim 2022-01-29 23:53:13 +06:00
parent 2d13488c17
commit a2a5899bbe
7 changed files with 1524 additions and 107 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.11] (2022-01-30)
* Add BytesVec type
## [0.1.10] (2022-01-26)
* Rename Pool::is_pending() to is_ready()

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-bytes"
version = "0.1.10"
version = "0.1.11"
license = "MIT"
authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Carl Lerche <me@carllerche.com>"]
description = "Types and traits for working with bytes (bytes crate fork)"

File diff suppressed because it is too large Load diff

View file

@ -39,7 +39,7 @@
//! let b = buf.split();
//! assert_eq!(b, b"goodbye world"[..]);
//!
//! assert_eq!(buf.capacity(), 998);
//! assert_eq!(buf.capacity(), 1030);
//! ```
//!
//! In the above example, only a single buffer of 1024 is allocated. The handles
@ -68,7 +68,7 @@ mod pool;
mod serde;
mod string;
pub use crate::bytes::{Bytes, BytesMut};
pub use crate::bytes::{Bytes, BytesMut, BytesVec};
pub use crate::string::ByteString;
#[doc(hidden)]

View file

@ -2,11 +2,11 @@
use std::sync::atomic::Ordering::{Relaxed, Release};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::task::{Context, Poll, Waker};
use std::{cell::Cell, cell::RefCell, fmt, future::Future, mem, pin::Pin, rc::Rc};
use std::{cell::Cell, cell::RefCell, fmt, future::Future, mem, pin::Pin, ptr, rc::Rc};
use futures_core::task::__internal::AtomicWaker;
use crate::BytesMut;
use crate::{BytesMut, BytesVec};
pub struct Pool {
idx: Cell<usize>,
@ -190,12 +190,23 @@ impl PoolRef {
buf.move_to_pool(self);
}
#[inline]
pub fn move_vec_in(self, buf: &mut BytesVec) {
buf.move_to_pool(self);
}
#[inline]
/// Creates a new `BytesMut` with the specified capacity.
pub fn buf_with_capacity(self, cap: usize) -> BytesMut {
BytesMut::with_capacity_in_priv(cap, self)
}
#[inline]
/// Creates a new `BytesVec` with the specified capacity.
pub fn vec_with_capacity(self, cap: usize) -> BytesVec {
BytesVec::with_capacity_in_priv(cap, self)
}
#[doc(hidden)]
#[inline]
/// Set max pool size
@ -378,6 +389,14 @@ impl fmt::Debug for PoolRef {
}
}
impl Eq for PoolRef {}
impl PartialEq for PoolRef {
fn eq(&self, other: &PoolRef) -> bool {
ptr::eq(&self.0, &other.0)
}
}
impl MemoryPool {
fn create(id: PoolId) -> &'static MemoryPool {
Box::leak(Box::new(MemoryPool {

View file

@ -1,7 +1,7 @@
#![deny(warnings, rust_2018_idioms)]
use std::task::Poll;
use ntex_bytes::{Buf, BufMut, Bytes, BytesMut, PoolId};
use ntex_bytes::{Buf, BufMut, Bytes, BytesMut, BytesVec, PoolId};
const LONG: &[u8] = b"mary had a little lamb, little lamb, little lamb";
const SHORT: &[u8] = b"hello world";
@ -13,7 +13,7 @@ fn inline_cap() -> usize {
const fn shared_vec() -> usize {
use std::mem;
3 * mem::size_of::<usize>()
3 * mem::size_of::<usize>() + 8
}
fn is_sync<T: Sync>() {}
@ -64,6 +64,20 @@ fn from_slice() {
assert_eq!(b"abcdefgh"[..], a);
assert_eq!(&b"abcdefgh"[..], a);
assert_eq!(Vec::from(&b"abcdefgh"[..]), a);
let a = BytesVec::copy_from_slice(&b"abcdefgh"[..]);
assert_eq!(a, b"abcdefgh"[..]);
assert_eq!(a, &b"abcdefgh"[..]);
assert_eq!(b"abcdefgh"[..], a);
assert_eq!(&b"abcdefgh"[..], a);
assert_eq!(Vec::from(&b"abcdefgh"[..]), a);
let a = BytesVec::copy_from_slice_in(&b"abcdefgh"[..], PoolId::P10);
assert_eq!(a, b"abcdefgh"[..]);
assert_eq!(a, &b"abcdefgh"[..]);
assert_eq!(b"abcdefgh"[..], a);
assert_eq!(&b"abcdefgh"[..], a);
assert_eq!(Vec::from(&b"abcdefgh"[..]), a);
}
#[test]
@ -75,6 +89,9 @@ fn fmt() {
let a = format!("{:?}", BytesMut::from(&b"abcdefg"[..]));
assert_eq!(a, b);
let a = format!("{:?}", BytesVec::copy_from_slice(&b"abcdefg"[..]));
assert_eq!(a, b);
}
#[test]
@ -92,7 +109,20 @@ fn fmt_write() {
write!(b, "{}", &s[32..64]).unwrap();
assert_eq!(b, s[..64].as_bytes());
let mut c = BytesMut::with_capacity(64);
let mut c = BytesMut::with_capacity(2);
write!(c, "{}", s).unwrap_err();
assert!(c.is_empty());
let mut a = BytesVec::with_capacity(64);
write!(a, "{}", &s[..64]).unwrap();
assert_eq!(a, s[..64].as_bytes());
let mut b = BytesVec::with_capacity(64);
write!(b, "{}", &s[..32]).unwrap();
write!(b, "{}", &s[32..64]).unwrap();
assert_eq!(b, s[..64].as_bytes());
let mut c = BytesVec::with_capacity(2);
write!(c, "{}", s).unwrap_err();
assert!(c.is_empty());
}
@ -105,11 +135,17 @@ fn len() {
let a = BytesMut::from(&b"abcdefg"[..]);
assert_eq!(a.len(), 7);
let a = BytesVec::copy_from_slice(&b"abcdefg"[..]);
assert_eq!(a.len(), 7);
let a = Bytes::from(&b""[..]);
assert!(a.is_empty());
let a = BytesMut::from(&b""[..]);
assert!(a.is_empty());
let a = BytesVec::copy_from_slice(&b""[..]);
assert!(a.is_empty());
}
#[test]
@ -141,12 +177,22 @@ fn inline() {
let mut a = BytesMut::from(vec![b'*'; 35]).freeze();
a.truncate(8);
assert!(a.is_inline());
let mut a = BytesVec::copy_from_slice(&vec![b'*'; 35]).freeze();
let b = a.split_to(8);
assert!(b.is_inline());
}
#[test]
fn index() {
let a = Bytes::from(&b"hello world"[..]);
assert_eq!(a[0..5], *b"hello");
let a = BytesMut::from(&b"hello world"[..]);
assert_eq!(a[0..5], *b"hello");
let a = BytesVec::copy_from_slice(&b"hello world"[..]);
assert_eq!(a[0..5], *b"hello");
}
#[test]
@ -217,7 +263,7 @@ fn split_off_uninitialized() {
assert_eq!(bytes.capacity(), 128);
assert_eq!(other.len(), 0);
assert_eq!(other.capacity(), 896);
assert_eq!(other.capacity(), 928);
}
#[test]
@ -285,6 +331,32 @@ fn split_to_1() {
assert_eq!(LONG[30..], a);
assert_eq!(LONG[..30], b);
// bytes mut
let mut a = BytesMut::from(Vec::from(LONG));
let b = a.split_to(4);
assert_eq!(LONG[4..], a);
assert_eq!(LONG[..4], b);
let mut a = BytesMut::from(Vec::from(LONG));
let b = a.split_to(30);
assert_eq!(LONG[30..], a);
assert_eq!(LONG[..30], b);
// bytes vec
let mut a = BytesVec::copy_from_slice(LONG);
let b = a.split_to(4);
assert_eq!(LONG[4..], a);
assert_eq!(LONG[..4], b);
let mut a = BytesVec::copy_from_slice(LONG);
let b = a.split_to(30);
assert_eq!(LONG[30..], a);
assert_eq!(LONG[..30], b);
}
#[test]
@ -296,6 +368,22 @@ fn split_to_2() {
assert_eq!(LONG[1..], a);
drop(b);
let mut a = BytesMut::from(LONG);
assert_eq!(LONG, a);
let b = a.split_to(1);
assert_eq!(LONG[1..], a);
drop(b);
let mut a = BytesVec::copy_from_slice(LONG);
assert_eq!(LONG, a);
let b = a.split_to(1);
assert_eq!(LONG[1..], a);
drop(b);
}
#[test]
@ -310,6 +398,9 @@ fn split_to_oob() {
fn split_to_oob_mut() {
let mut hello = BytesMut::from(&b"helloworld"[..]);
hello.split_to(inline_cap() + 1);
let mut hello = BytesVec::copy_from_slice(&b"helloworld"[..]);
hello.split_to(inline_cap() + 1);
}
#[test]
@ -317,6 +408,9 @@ fn split_to_oob_mut() {
fn split_to_uninitialized() {
let mut bytes = BytesMut::with_capacity(1024);
let _other = bytes.split_to(128);
let mut bytes = BytesVec::with_capacity(1024);
let _other = bytes.split_to(128);
}
#[test]
@ -353,6 +447,14 @@ fn fns_defined_for_bytes_mut() {
// Iterator
let v: Vec<u8> = bytes.as_ref().iter().cloned().collect();
assert_eq!(&v[..], bytes);
let mut bytes = BytesVec::copy_from_slice(&b"hello world"[..]);
bytes.as_ptr();
bytes.as_mut_ptr();
// Iterator
let v: Vec<u8> = bytes.as_ref().iter().cloned().collect();
assert_eq!(&v[..], bytes);
}
#[test]
@ -360,7 +462,7 @@ fn reserve_convert() {
// Vec -> Vec
let mut bytes = BytesMut::from(LONG);
bytes.reserve(64);
assert_eq!(bytes.capacity(), LONG.len() + 64);
assert_eq!(bytes.capacity(), LONG.len() + 80);
// Arc -> Vec
let mut bytes = BytesMut::from(Vec::from(LONG));
@ -370,6 +472,11 @@ fn reserve_convert() {
assert!(bytes.capacity() >= bytes.len() + 128);
drop(a);
// Vec -> Vec
let mut bytes = BytesVec::copy_from_slice(LONG);
bytes.reserve(64);
assert_eq!(bytes.capacity(), LONG.len() + 80);
}
// Without either looking at the internals of the BytesMut or doing weird stuff
@ -387,6 +494,17 @@ fn reserve_vec_recycling() {
assert_eq!(bytes.capacity(), 16);
}
#[test]
fn reserve_recycling() {
let mut bytes = BytesVec::with_capacity(16);
assert_eq!(bytes.capacity(), 32);
bytes.put("0123456789012345".as_bytes());
bytes.advance(10);
assert_eq!(bytes.capacity(), 22);
bytes.reserve(32);
assert_eq!(bytes.capacity(), 64);
}
#[test]
fn reserve_in_arc_unique_does_not_overallocate() {
let mut bytes = BytesMut::with_capacity(1000);
@ -394,9 +512,9 @@ fn reserve_in_arc_unique_does_not_overallocate() {
// now bytes is Arc and refcount == 1
assert_eq!(1000, bytes.capacity());
assert_eq!(1024, bytes.capacity());
bytes.reserve(2001);
assert_eq!(2001, bytes.capacity());
assert_eq!(2016, bytes.capacity());
}
#[test]
@ -406,9 +524,9 @@ fn reserve_in_arc_unique_doubles() {
// now bytes is Arc and refcount == 1
assert_eq!(1000, bytes.capacity());
bytes.reserve(1001);
assert_eq!(1001, bytes.capacity());
assert_eq!(1024, bytes.capacity());
bytes.reserve(1025);
assert_eq!(1056, bytes.capacity());
}
#[test]
@ -418,9 +536,9 @@ fn reserve_in_arc_nonunique_does_not_overallocate() {
// now bytes is Arc and refcount == 2
assert_eq!(1000, bytes.capacity());
assert_eq!(1024, bytes.capacity());
bytes.reserve(2001);
assert_eq!(2001, bytes.capacity());
assert_eq!(2016, bytes.capacity());
}
#[test]
@ -437,6 +555,10 @@ fn extend_mut() {
let mut bytes = BytesMut::with_capacity(0);
bytes.extend(LONG);
assert_eq!(*bytes, LONG[..]);
let mut bytes = BytesVec::with_capacity(0);
bytes.extend(LONG);
assert_eq!(*bytes, LONG[..]);
}
#[test]
@ -447,6 +569,13 @@ fn extend_from_slice_mut() {
bytes.extend_from_slice(&LONG[i..]);
assert_eq!(LONG[..], *bytes);
}
for &i in &[3, 34] {
let mut bytes = BytesVec::new();
bytes.extend_from_slice(&LONG[..i]);
bytes.extend_from_slice(&LONG[i..]);
assert_eq!(LONG[..], *bytes);
}
}
#[test]
@ -487,6 +616,20 @@ fn advance_vec() {
a.advance(6);
assert_eq!(a, b"d zomg wat wat"[..]);
let mut a = BytesVec::copy_from_slice(b"hello world boooo yah world zomg wat wat");
a.advance(16);
assert_eq!(a, b"o yah world zomg wat wat"[..]);
a.advance(4);
assert_eq!(a, b"h world zomg wat wat"[..]);
// Reserve some space.
a.reserve(1024);
assert_eq!(a, b"h world zomg wat wat"[..]);
a.advance(6);
assert_eq!(a, b"d zomg wat wat"[..]);
}
#[test]
@ -496,6 +639,13 @@ fn advance_past_len() {
a.advance(20);
}
#[test]
#[should_panic]
fn advance_past_len_vec() {
let mut a = BytesVec::copy_from_slice(b"hello world");
a.advance(20);
}
#[test]
// Only run these tests on little endian systems. CI uses qemu for testing
// little endian... and qemu doesn't really support threading all that well.
@ -545,6 +695,17 @@ fn partial_eq_bytesmut() {
assert!(bytesmut != bytes2);
}
#[test]
fn partial_eq_bytesvec() {
let bytes = Bytes::from(&b"The quick red fox"[..]);
let bytesmut = BytesVec::copy_from_slice(&b"The quick red fox"[..]);
assert!(bytes == bytesmut);
assert!(bytesmut == bytes);
let bytes2 = Bytes::from(&b"Jumped over the lazy brown dog"[..]);
assert!(bytes2 != bytesmut);
assert!(bytesmut != bytes2);
}
#[test]
fn from_iter_no_size_hint() {
use std::iter;
@ -619,15 +780,48 @@ fn empty_slice_ref_catches_not_an_empty_subset() {
bytes.slice_ref(slice);
}
#[test]
fn bytes_vec_freeze() {
let bytes = BytesVec::copy_from_slice(b"12345");
assert_eq!(bytes, &b"12345"[..]);
let b = bytes.freeze();
assert_eq!(b, &b"12345"[..]);
assert!(b.is_inline());
let bytes = BytesVec::copy_from_slice(LONG);
assert_eq!(bytes, LONG);
let b = bytes.freeze();
assert_eq!(b, LONG);
}
#[test]
fn bytes_vec() {
let mut bytes = BytesVec::copy_from_slice(LONG);
bytes.with_bytes_mut(|buf| {
assert_eq!(buf.split_to(4), &LONG[..4]);
});
assert_eq!(bytes, &LONG[4..]);
bytes.with_bytes_mut(|buf| {
assert_eq!(buf.split_off(10), &LONG[14..]);
});
assert_eq!(bytes, &LONG[4..14]);
bytes.with_bytes_mut(|buf| {
*buf = BytesMut::from(b"12345".to_vec());
});
assert_eq!(bytes, &"12345"[..]);
}
#[test]
fn pool() {
// Pool
let p1 = PoolId::P1.pool_ref();
assert_eq!(p1.allocated(), 0);
let mut buf = BytesMut::with_capacity_in(1024, p1);
assert_eq!(p1.allocated(), 1024 + shared_vec());
assert_eq!(p1.allocated(), 1056 + shared_vec());
buf.reserve(2048);
assert_eq!(p1.allocated(), 2048 + shared_vec());
assert_eq!(p1.allocated(), 2080 + shared_vec());
drop(buf);
assert_eq!(p1.allocated(), 0);
@ -635,18 +829,26 @@ fn pool() {
let p = PoolId::DEFAULT.pool_ref();
assert_eq!(p.allocated(), 0);
let mut buf = BytesMut::with_capacity(1024);
assert_eq!(p.allocated(), 1024 + shared_vec());
assert_eq!(p.allocated(), 1056 + shared_vec());
buf.reserve(2048);
assert_eq!(p.allocated(), 2048 + shared_vec());
assert_eq!(p.allocated(), 2080 + shared_vec());
drop(buf);
assert_eq!(p.allocated(), 0);
let mut buf = BytesMut::with_capacity(1024);
assert_eq!(p.allocated(), 1024 + shared_vec());
assert_eq!(p.allocated(), 1056 + shared_vec());
assert_eq!(p1.allocated(), 0);
p1.move_in(&mut buf);
assert_eq!(p.allocated(), 0);
assert_eq!(p1.allocated(), 1024 + shared_vec());
assert_eq!(p1.allocated(), 1056 + shared_vec());
let p1 = PoolId::P2.pool_ref();
let mut buf = BytesVec::with_capacity(1024);
assert_eq!(p.allocated(), 1056 + shared_vec());
assert_eq!(p1.allocated(), 0);
p1.move_vec_in(&mut buf);
assert_eq!(p.allocated(), 0);
assert_eq!(p1.allocated(), 1056 + shared_vec());
}
#[ntex::test]

View file

@ -1,7 +1,6 @@
use std::{marker::PhantomData, task::Context, task::Poll};
use crate::and_then::{AndThen, AndThenFactory};
// use crate::and_then_apply_fn::{AndThenApplyFn, AndThenApplyFnFactory};
use crate::map::{Map, MapServiceFactory};
use crate::map_err::{MapErr, MapErrServiceFactory};
use crate::map_init_err::MapInitErr;
@ -61,28 +60,6 @@ impl<T: Service<R>, R> Pipeline<T, R> {
}
}
// /// Apply function to specified service and use it as a next service in
// /// chain.
// ///
// /// Short version of `pipeline_factory(...).and_then(apply_fn_factory(...))`
// pub fn and_then_apply_fn<U, I, F, Fut, Res, Err>(
// self,
// service: I,
// f: F,
// ) -> Pipeline<impl Service<Request = T::Request, Response = Res, Error = Err> + Clone>
// where
// Self: Sized,
// I: IntoService<U>,
// U: Service,
// F: Fn(T::Response, &U) -> Fut,
// Fut: Future<Output = Result<Res, Err>>,
// Err: From<T::Error> + From<U::Error>,
// {
// Pipeline {
// service: AndThenApplyFn::new(self.service, service.into_service(), f),
// }
// }
/// Chain on a computation for when a call to the service finished,
/// passing the result of the call to the next service `U`.
///
@ -197,39 +174,6 @@ impl<T: ServiceFactory<R, C>, R, C> PipelineFactory<T, R, C> {
}
}
// /// Apply function to specified service and use it as a next service in
// /// chain.
// ///
// /// Short version of `pipeline_factory(...).and_then(apply_fn_factory(...))`
// pub fn and_then_apply_fn<U, I, F, Fut, Res, Err>(
// self,
// factory: I,
// f: F,
// ) -> PipelineFactory<
// impl ServiceFactory<
// Request = T::Request,
// Response = Res,
// Error = Err,
// Config = T::Config,
// InitError = T::InitError,
// Service = impl Service<Request = T::Request, Response = Res, Error = Err>
// + Clone,
// > + Clone,
// >
// where
// Self: Sized,
// T::Config: Clone,
// I: IntoServiceFactory<U>,
// U: ServiceFactory<Config = T::Config, InitError = T::InitError>,
// F: Fn(T::Response, &U::Service) -> Fut + Clone,
// Fut: Future<Output = Result<Res, Err>>,
// Err: From<T::Error> + From<U::Error>,
// {
// PipelineFactory {
// factory: AndThenApplyFnFactory::new(self.factory, factory.into_factory(), f),
// }
// }
/// Apply transform to current service factory.
///
/// Short version of `apply(transform, pipeline_factory(...))`