Memory pools support for http (#65)

* memory pools support for http
This commit is contained in:
Nikolay Kim 2021-12-06 08:07:19 +06:00 committed by GitHub
parent 2ab64627a8
commit 0315d92401
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 120 additions and 229 deletions

View file

@ -1,10 +1,10 @@
# Changes # Changes
## 0.1.6 (2021-12-03) ## [0.1.6] (2021-12-03)
* Better api usability * Better api usability
## 0.1.5 (2021-12-02) ## [0.1.5] (2021-12-02)
* Split,freeze,truncate operations produce inline Bytes object if possible * Split,freeze,truncate operations produce inline Bytes object if possible
@ -12,21 +12,21 @@
* Introduce memory pools * Introduce memory pools
## 0.1.4 (2021-06-27) ## [0.1.4] (2021-06-27)
* Reduce size of Option<Bytes> by using NonNull * Reduce size of Option<Bytes> by using NonNull
## 0.1.2 (2021-06-27) ## [0.1.2] (2021-06-27)
* Reserve space for put_slice * Reserve space for put_slice
## 0.1.1 (2021-06-27) ## [0.1.1] (2021-06-27)
* Add `ByteString::as_slice()` method * Add `ByteString::as_slice()` method
* Enable serde * Enable serde
## 0.1.0 (2021-06-27) ## [0.1.0] (2021-06-27)
* Add `Bytes::trimdown()` method * Add `Bytes::trimdown()` method
@ -34,91 +34,4 @@
* Remove unused code * Remove unused code
* Project fork * Project fork from 0.4 version
# 0.4.12 (March 6, 2019)
### Added
- Implement `FromIterator<&'a u8>` for `BytesMut`/`Bytes` (#244).
- Implement `Buf` for `VecDeque` (#249).
# 0.4.11 (November 17, 2018)
* Use raw pointers for potentially racy loads (#233).
* Implement `BufRead` for `buf::Reader` (#232).
* Documentation tweaks (#234).
# 0.4.10 (September 4, 2018)
* impl `Buf` and `BufMut` for `Either` (#225).
* Add `Bytes::slice_ref` (#208).
# 0.4.9 (July 12, 2018)
* Add 128 bit number support behind a feature flag (#209).
* Implement `IntoBuf` for `&mut [u8]`
# 0.4.8 (May 25, 2018)
* Fix panic in `BytesMut` `FromIterator` implementation.
* Bytes: Recycle space when reserving space in vec mode (#197).
* Bytes: Add resize fn (#203).
# 0.4.7 (April 27, 2018)
* Make `Buf` and `BufMut` usable as trait objects (#186).
* impl BorrowMut for BytesMut (#185).
* Improve accessor performance (#195).
# 0.4.6 (Janary 8, 2018)
* Implement FromIterator for Bytes/BytesMut (#148).
* Add `advance` fn to Bytes/BytesMut (#166).
* Add `unsplit` fn to `BytesMut` (#162, #173).
* Improvements to Bytes split fns (#92).
# 0.4.5 (August 12, 2017)
* Fix range bug in `Take::bytes`
* Misc performance improvements
* Add extra `PartialEq` implementations.
* Add `Bytes::with_capacity`
* Implement `AsMut[u8]` for `BytesMut`
# 0.4.4 (May 26, 2017)
* Add serde support behind feature flag
* Add `extend_from_slice` on `Bytes` and `BytesMut`
* Add `truncate` and `clear` on `Bytes`
* Misc additional std trait implementations
* Misc performance improvements
# 0.4.3 (April 30, 2017)
* Fix Vec::advance_mut bug
* Bump minimum Rust version to 1.15
* Misc performance tweaks
# 0.4.2 (April 5, 2017)
* Misc performance tweaks
* Improved `Debug` implementation for `Bytes`
* Avoid some incorrect assert panics
# 0.4.1 (March 15, 2017)
* Expose `buf` module and have most types available from there vs. root.
* Implement `IntoBuf` for `T: Buf`.
* Add `FromBuf` and `Buf::collect`.
* Add iterator adapter for `Buf`.
* Add scatter/gather support to `Buf` and `BufMut`.
* Add `Buf::chain`.
* Reduce allocations on repeated calls to `BytesMut::reserve`.
* Implement `Debug` for more types.
* Remove `Source` in favor of `IntoBuf`.
* Implement `Extend` for `BytesMut`.
# 0.4.0 (February 24, 2017)
* Initial release

View file

@ -48,7 +48,7 @@ struct MemoryPool {
window_waiters: Cell<usize>, window_waiters: Cell<usize>,
windows: Cell<[(usize, usize); 10]>, windows: Cell<[(usize, usize); 10]>,
// io read/write bytesmut cache and params // io read/write cache and params
read_wm: Cell<BufParams>, read_wm: Cell<BufParams>,
read_cache: RefCell<Vec<BytesMut>>, read_cache: RefCell<Vec<BytesMut>>,
write_wm: Cell<BufParams>, write_wm: Cell<BufParams>,
@ -454,6 +454,16 @@ impl Drop for Pool {
} }
} }
impl fmt::Debug for Pool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Pool")
.field("id", &self.id().0)
.field("allocated", &self.inner.size.load(Relaxed))
.field("ready", &!self.is_pending())
.finish()
}
}
impl Pool { impl Pool {
#[inline] #[inline]
/// Get pool id. /// Get pool id.
@ -491,11 +501,14 @@ impl Pool {
#[inline] #[inline]
pub fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> { pub fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
if self.inner.max_size.get() > 0 { if self.inner.max_size.get() > 0 {
let allocated = self.inner.size.load(Relaxed); let window_l = self.inner.window_l.get();
if window_l == 0 {
return Poll::Ready(());
}
// lower than low // lower than low
let window_l = self.inner.window_l.get(); let allocated = self.inner.size.load(Relaxed);
if window_l == 0 || allocated < window_l { if allocated < window_l {
let idx = self.idx.get(); let idx = self.idx.get();
if idx > 0 { if idx > 0 {
// cleanup waiter // cleanup waiter
@ -507,7 +520,7 @@ impl Pool {
return Poll::Ready(()); return Poll::Ready(());
} }
// register waiter // register waiter only if spawn fn is provided
if let Some(spawn) = &*self.inner.spawn.borrow() { if let Some(spawn) = &*self.inner.spawn.borrow() {
let idx = self.idx.get(); let idx = self.idx.get();
let mut flags = self.inner.flags.get(); let mut flags = self.inner.flags.get();
@ -519,15 +532,21 @@ impl Pool {
waiters.update(idx - 1, ctx.waker().clone()) waiters.update(idx - 1, ctx.waker().clone())
}; };
if flags.contains(Flags::INCREASED) || !new { // if memory usage has increased since last window change,
// block all readyness check. otherwise wake up one existing waiter
if new {
if !flags.contains(Flags::INCREASED) {
if let Some(waker) = waiters.consume() {
waker.wake()
}
} else {
self.inner self.inner
.window_waiters .window_waiters
.set(self.inner.window_waiters.get() + 1); .set(self.inner.window_waiters.get() + 1);
} else if let Some(waker) = waiters.consume() { }
waker.wake();
} }
// start driver task // start driver task if needed
if !flags.contains(Flags::SPAWNED) { if !flags.contains(Flags::SPAWNED) {
flags.insert(Flags::SPAWNED); flags.insert(Flags::SPAWNED);
self.inner.flags.set(flags); self.inner.flags.set(flags);

View file

@ -11,7 +11,7 @@ mod tree;
pub use self::de::PathDeserializer; pub use self::de::PathDeserializer;
pub use self::path::{Path, PathIter}; pub use self::path::{Path, PathIter};
pub use self::resource::ResourceDef; pub use self::resource::ResourceDef;
pub use self::router::{ResourceInfo, Router, RouterBuilder}; pub use self::router::{Router, RouterBuilder};
pub trait Resource<T: ResourcePath> { pub trait Resource<T: ResourcePath> {
fn path(&self) -> &str; fn path(&self) -> &str;

View file

@ -56,7 +56,6 @@ pub(crate) enum Segment {
Dynamic { Dynamic {
pattern: Regex, pattern: Regex,
names: Vec<&'static str>, names: Vec<&'static str>,
len: usize,
tail: bool, tail: bool,
}, },
} }
@ -375,7 +374,6 @@ impl ResourceDef {
names, names,
tail, tail,
pattern: re, pattern: re,
len: 0,
}); });
pattern = rem; pattern = rem;
@ -401,7 +399,6 @@ impl ResourceDef {
pattern, pattern,
names: Vec::new(), names: Vec::new(),
tail: true, tail: true,
len: 0,
}); });
} else { } else {
pelems.push(Segment::Static(pattern.to_string())); pelems.push(Segment::Static(pattern.to_string()));
@ -720,7 +717,6 @@ mod tests {
let seg2 = Segment::Dynamic { let seg2 = Segment::Dynamic {
pattern: Regex::new("test").unwrap(), pattern: Regex::new("test").unwrap(),
names: Vec::new(), names: Vec::new(),
len: 1,
tail: false, tail: false,
}; };
assert!(seg != seg2); assert!(seg != seg2);

View file

@ -4,12 +4,6 @@ use super::{IntoPattern, Resource, ResourceDef, ResourcePath};
#[derive(Debug, Copy, Clone, PartialEq)] #[derive(Debug, Copy, Clone, PartialEq)]
pub struct ResourceId(pub u16); pub struct ResourceId(pub u16);
/// Information about current resource
#[derive(Clone, Debug)]
pub struct ResourceInfo {
resource: ResourceId,
}
/// Resource router. /// Resource router.
#[derive(Clone)] #[derive(Clone)]
pub struct Router<T, U = ()> { pub struct Router<T, U = ()> {

View file

@ -5,7 +5,7 @@ use super::path::PathItem;
use super::resource::{ResourceDef, Segment}; use super::resource::{ResourceDef, Segment};
use super::{Resource, ResourcePath}; use super::{Resource, ResourcePath};
#[derive(Debug, Clone)] #[derive(Debug, Clone, Default)]
pub(super) struct Tree { pub(super) struct Tree {
key: Vec<Segment>, key: Vec<Segment>,
items: Vec<Item>, items: Vec<Item>,
@ -43,15 +43,6 @@ impl Value {
} }
} }
impl Default for Tree {
fn default() -> Tree {
Tree {
key: Vec::new(),
items: Vec::new(),
}
}
}
impl Tree { impl Tree {
pub(crate) fn new(resource: &ResourceDef, value: usize) -> Tree { pub(crate) fn new(resource: &ResourceDef, value: usize) -> Tree {
if resource.tp.is_empty() { if resource.tp.is_empty() {

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.4.12] - 2021-12-06
* http: Use memory pools
## [0.4.11] - 2021-12-02 ## [0.4.11] - 2021-12-02
* framed: Use memory pools * framed: Use memory pools

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "0.4.11" version = "0.4.12"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"
@ -49,7 +49,7 @@ ntex-router = "0.5.1"
ntex-service = "0.2.1" ntex-service = "0.2.1"
ntex-macros = "0.1.3" ntex-macros = "0.1.3"
ntex-util = "0.1.1" ntex-util = "0.1.1"
ntex-bytes = "0.1.5" ntex-bytes = "0.1.6"
base64 = "0.13" base64 = "0.13"
bitflags = "1.3" bitflags = "1.3"

View file

@ -15,7 +15,7 @@ enum IoWriteState {
enum Shutdown { enum Shutdown {
None, None,
Flushed, Flushed,
Shutdown, Stopping,
} }
/// Write io task /// Write io task
@ -116,7 +116,7 @@ where
match Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx) match Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx)
{ {
Poll::Ready(Ok(_)) => { Poll::Ready(Ok(_)) => {
*st = Shutdown::Shutdown; *st = Shutdown::Stopping;
continue; continue;
} }
Poll::Ready(Err(_)) => { Poll::Ready(Err(_)) => {
@ -129,7 +129,7 @@ where
_ => (), _ => (),
} }
} }
Shutdown::Shutdown => { Shutdown::Stopping => {
// read until 0 or err // read until 0 or err
let mut buf = [0u8; 512]; let mut buf = [0u8; 512];
let mut io = this.io.borrow_mut(); let mut io = this.io.borrow_mut();

View file

@ -12,6 +12,7 @@ use crate::http::response::Response;
use crate::http::service::HttpService; use crate::http::service::HttpService;
use crate::service::{boxed, IntoService, IntoServiceFactory, Service, ServiceFactory}; use crate::service::{boxed, IntoService, IntoServiceFactory, Service, ServiceFactory};
use crate::time::{Millis, Seconds}; use crate::time::{Millis, Seconds};
use crate::util::PoolId;
/// A http service builder /// A http service builder
/// ///
@ -22,9 +23,7 @@ pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler<T>> {
client_timeout: Millis, client_timeout: Millis,
client_disconnect: Seconds, client_disconnect: Seconds,
handshake_timeout: Millis, handshake_timeout: Millis,
lw: u16, pool: PoolId,
read_hw: u16,
write_hw: u16,
expect: X, expect: X,
upgrade: Option<U>, upgrade: Option<U>,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>, on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
@ -40,9 +39,7 @@ impl<T, S> HttpServiceBuilder<T, S, ExpectHandler, UpgradeHandler<T>> {
client_timeout: Millis::from_secs(3), client_timeout: Millis::from_secs(3),
client_disconnect: Seconds(3), client_disconnect: Seconds(3),
handshake_timeout: Millis::from_secs(5), handshake_timeout: Millis::from_secs(5),
lw: 1024, pool: PoolId::P1,
read_hw: 8 * 1024,
write_hw: 8 * 1024,
expect: ExpectHandler, expect: ExpectHandler,
upgrade: None, upgrade: None,
on_connect: None, on_connect: None,
@ -116,19 +113,27 @@ where
self self
} }
/// Set memory pool.
///
/// Use specified memory pool for memory allocations. By default P1
/// memory pool is used.
pub fn memory_pool(mut self, id: PoolId) -> Self {
self.pool = id;
self
}
#[doc(hidden)]
#[deprecated(since = "0.4.12", note = "Use memory pool config")]
#[inline] #[inline]
/// Set read/write buffer params /// Set read/write buffer params
/// ///
/// By default read buffer is 8kb, write buffer is 8kb /// By default read buffer is 8kb, write buffer is 8kb
pub fn buffer_params( pub fn buffer_params(
mut self, self,
max_read_buf_size: u16, _max_read_buf_size: u16,
max_write_buf_size: u16, _max_write_buf_size: u16,
min_buf_size: u16, _min_buf_size: u16,
) -> Self { ) -> Self {
self.read_hw = max_read_buf_size;
self.write_hw = max_write_buf_size;
self.lw = min_buf_size;
self self
} }
@ -151,13 +156,11 @@ where
client_timeout: self.client_timeout, client_timeout: self.client_timeout,
client_disconnect: self.client_disconnect, client_disconnect: self.client_disconnect,
handshake_timeout: self.handshake_timeout, handshake_timeout: self.handshake_timeout,
pool: self.pool,
expect: expect.into_factory(), expect: expect.into_factory(),
upgrade: self.upgrade, upgrade: self.upgrade,
on_connect: self.on_connect, on_connect: self.on_connect,
on_request: self.on_request, on_request: self.on_request,
lw: self.lw,
read_hw: self.read_hw,
write_hw: self.write_hw,
_t: PhantomData, _t: PhantomData,
} }
} }
@ -184,13 +187,11 @@ where
client_timeout: self.client_timeout, client_timeout: self.client_timeout,
client_disconnect: self.client_disconnect, client_disconnect: self.client_disconnect,
handshake_timeout: self.handshake_timeout, handshake_timeout: self.handshake_timeout,
pool: self.pool,
expect: self.expect, expect: self.expect,
upgrade: Some(upgrade.into_factory()), upgrade: Some(upgrade.into_factory()),
on_connect: self.on_connect, on_connect: self.on_connect,
on_request: self.on_request, on_request: self.on_request,
lw: self.lw,
read_hw: self.read_hw,
write_hw: self.write_hw,
_t: PhantomData, _t: PhantomData,
} }
} }
@ -239,9 +240,7 @@ where
self.client_timeout, self.client_timeout,
self.client_disconnect, self.client_disconnect,
self.handshake_timeout, self.handshake_timeout,
self.lw, self.pool,
self.read_hw,
self.write_hw,
); );
H1Service::with_config(cfg, service.into_factory()) H1Service::with_config(cfg, service.into_factory())
.expect(self.expect) .expect(self.expect)
@ -265,9 +264,7 @@ where
self.client_timeout, self.client_timeout,
self.client_disconnect, self.client_disconnect,
self.handshake_timeout, self.handshake_timeout,
self.lw, self.pool,
self.read_hw,
self.write_hw,
); );
H2Service::with_config(cfg, service.into_factory()).on_connect(self.on_connect) H2Service::with_config(cfg, service.into_factory()).on_connect(self.on_connect)
} }
@ -288,9 +285,7 @@ where
self.client_timeout, self.client_timeout,
self.client_disconnect, self.client_disconnect,
self.handshake_timeout, self.handshake_timeout,
self.lw, self.pool,
self.read_hw,
self.write_hw,
); );
HttpService::with_config(cfg, service.into_factory()) HttpService::with_config(cfg, service.into_factory())
.expect(self.expect) .expect(self.expect)

View file

@ -4,7 +4,7 @@ use crate::framed::Timer;
use crate::http::{Request, Response}; use crate::http::{Request, Response};
use crate::service::boxed::BoxService; use crate::service::boxed::BoxService;
use crate::time::{sleep, Millis, Seconds, Sleep}; use crate::time::{sleep, Millis, Seconds, Sleep};
use crate::util::{BytesMut, PoolRef}; use crate::util::{BytesMut, PoolId};
#[derive(Debug, PartialEq, Clone, Copy)] #[derive(Debug, PartialEq, Clone, Copy)]
/// Server keep-alive setting /// Server keep-alive setting
@ -44,9 +44,7 @@ pub(super) struct Inner {
pub(super) timer: DateService, pub(super) timer: DateService,
pub(super) ssl_handshake_timeout: Millis, pub(super) ssl_handshake_timeout: Millis,
pub(super) timer_h1: Timer, pub(super) timer_h1: Timer,
pub(super) lw: u16, pub(super) pool: PoolId,
pub(super) read_hw: u16,
pub(super) write_hw: u16,
} }
impl Clone for ServiceConfig { impl Clone for ServiceConfig {
@ -62,9 +60,7 @@ impl Default for ServiceConfig {
Millis::ZERO, Millis::ZERO,
Seconds::ZERO, Seconds::ZERO,
Millis(5_000), Millis(5_000),
1024, PoolId::P1,
8 * 1024,
8 * 1024,
) )
} }
} }
@ -76,9 +72,7 @@ impl ServiceConfig {
client_timeout: Millis, client_timeout: Millis,
client_disconnect: Seconds, client_disconnect: Seconds,
ssl_handshake_timeout: Millis, ssl_handshake_timeout: Millis,
lw: u16, pool: PoolId,
read_hw: u16,
write_hw: u16,
) -> ServiceConfig { ) -> ServiceConfig {
let (keep_alive, ka_enabled) = match keep_alive { let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (Millis::from(val), true), KeepAlive::Timeout(val) => (Millis::from(val), true),
@ -87,19 +81,13 @@ impl ServiceConfig {
}; };
let keep_alive = if ka_enabled { keep_alive } else { Millis::ZERO }; let keep_alive = if ka_enabled { keep_alive } else { Millis::ZERO };
PoolRef::default()
.set_read_params(read_hw, lw)
.set_write_params(write_hw, lw);
ServiceConfig(Rc::new(Inner { ServiceConfig(Rc::new(Inner {
keep_alive, keep_alive,
ka_enabled, ka_enabled,
client_timeout, client_timeout,
client_disconnect, client_disconnect,
ssl_handshake_timeout, ssl_handshake_timeout,
lw, pool,
read_hw,
write_hw,
timer: DateService::new(), timer: DateService::new(),
timer_h1: Timer::default(), timer_h1: Timer::default(),
})) }))
@ -118,9 +106,7 @@ pub(super) struct DispatcherConfig<T, S, X, U> {
pub(super) ka_enabled: bool, pub(super) ka_enabled: bool,
pub(super) timer: DateService, pub(super) timer: DateService,
pub(super) timer_h1: Timer, pub(super) timer_h1: Timer,
pub(super) lw: u16, pub(super) pool: PoolId,
pub(super) read_hw: u16,
pub(super) write_hw: u16,
pub(super) on_request: Option<OnRequest<T>>, pub(super) on_request: Option<OnRequest<T>>,
} }
@ -143,9 +129,7 @@ impl<T, S, X, U> DispatcherConfig<T, S, X, U> {
ka_enabled: cfg.0.ka_enabled, ka_enabled: cfg.0.ka_enabled,
timer: cfg.0.timer.clone(), timer: cfg.0.timer.clone(),
timer_h1: cfg.0.timer_h1.clone(), timer_h1: cfg.0.timer_h1.clone(),
lw: cfg.0.lw, pool: cfg.0.pool,
read_hw: cfg.0.read_hw,
write_hw: cfg.0.write_hw,
} }
} }

View file

@ -113,12 +113,8 @@ where
on_connect_data: Option<Box<dyn DataFactory>>, on_connect_data: Option<Box<dyn DataFactory>>,
) -> Self { ) -> Self {
let codec = Codec::new(config.timer.clone(), config.keep_alive_enabled()); let codec = Codec::new(config.timer.clone(), config.keep_alive_enabled());
let state = IoState::with_params( let state = IoState::with_memory_pool(config.pool.into());
config.read_hw, state.set_disconnect_timeout(config.client_disconnect);
config.write_hw,
config.lw,
config.client_disconnect,
);
let mut expire = config.timer_h1.now(); let mut expire = config.timer_h1.now();
let io = Rc::new(RefCell::new(io)); let io = Rc::new(RefCell::new(io));
@ -992,7 +988,7 @@ mod tests {
let mut h1 = h1(server, |_| { let mut h1 = h1(server, |_| {
Box::pin(async { Ok::<_, io::Error>(Response::Ok().finish()) }) Box::pin(async { Ok::<_, io::Error>(Response::Ok().finish()) })
}); });
crate::util::PoolRef::default() crate::util::PoolId::P1
.set_read_params(15 * 1024, 1024) .set_read_params(15 * 1024, 1024)
.set_write_params(15 * 1024, 1024); .set_write_params(15 * 1024, 1024);

View file

@ -12,7 +12,7 @@ use crate::http::helpers::DataFactory;
use crate::http::request::Request; use crate::http::request::Request;
use crate::http::response::Response; use crate::http::response::Response;
use crate::service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use crate::service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use crate::{rt::net::TcpStream, time::Millis}; use crate::{rt::net::TcpStream, time::Millis, util::Pool};
use super::codec::Codec; use super::codec::Codec;
use super::dispatcher::Dispatcher; use super::dispatcher::Dispatcher;
@ -326,8 +326,10 @@ where
let config = Rc::new(DispatcherConfig::new( let config = Rc::new(DispatcherConfig::new(
cfg, service, expect, upgrade, on_request, cfg, service, expect, upgrade, on_request,
)); ));
let pool = config.pool.into();
Ok(H1ServiceHandler { Ok(H1ServiceHandler {
pool,
config, config,
on_connect, on_connect,
_t: marker::PhantomData, _t: marker::PhantomData,
@ -338,6 +340,7 @@ where
/// `Service` implementation for HTTP1 transport /// `Service` implementation for HTTP1 transport
pub struct H1ServiceHandler<T, S: Service, B, X: Service, U: Service> { pub struct H1ServiceHandler<T, S: Service, B, X: Service, U: Service> {
pool: Pool,
config: Rc<DispatcherConfig<T, S, X, U>>, config: Rc<DispatcherConfig<T, S, X, U>>,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>, on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
_t: marker::PhantomData<(T, B)>, _t: marker::PhantomData<(T, B)>,
@ -397,6 +400,8 @@ where
ready ready
}; };
let ready = self.pool.poll_ready(cx).is_ready() && ready;
if ready { if ready {
task::Poll::Ready(Ok(())) task::Poll::Ready(Ok(()))
} else { } else {

View file

@ -10,7 +10,7 @@ use crate::framed::State;
use crate::rt::net::TcpStream; use crate::rt::net::TcpStream;
use crate::service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use crate::service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use crate::time::{Millis, Seconds}; use crate::time::{Millis, Seconds};
use crate::util::Bytes; use crate::util::{Bytes, Pool, PoolId};
use super::body::MessageBody; use super::body::MessageBody;
use super::builder::HttpServiceBuilder; use super::builder::HttpServiceBuilder;
@ -65,9 +65,7 @@ where
Millis(5_000), Millis(5_000),
Seconds::ZERO, Seconds::ZERO,
Millis(5_000), Millis(5_000),
1024, PoolId::P1,
8 * 1024,
8 * 1024,
); );
HttpService { HttpService {
@ -412,8 +410,10 @@ where
let config = let config =
DispatcherConfig::new(cfg, service, expect, upgrade, on_request); DispatcherConfig::new(cfg, service, expect, upgrade, on_request);
let pool = config.pool.into();
Ok(HttpServiceHandler { Ok(HttpServiceHandler {
pool,
on_connect, on_connect,
config: Rc::new(config), config: Rc::new(config),
_t: marker::PhantomData, _t: marker::PhantomData,
@ -424,6 +424,7 @@ where
/// `Service` implementation for http transport /// `Service` implementation for http transport
pub struct HttpServiceHandler<T, S: Service, B, X: Service, U: Service> { pub struct HttpServiceHandler<T, S: Service, B, X: Service, U: Service> {
pool: Pool,
config: Rc<DispatcherConfig<T, S, X, U>>, config: Rc<DispatcherConfig<T, S, X, U>>,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>, on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
_t: marker::PhantomData<(T, B, X)>, _t: marker::PhantomData<(T, B, X)>,
@ -481,6 +482,8 @@ where
ready ready
}; };
let ready = self.pool.poll_ready(cx).is_ready() && ready;
if ready { if ready {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} else { } else {

View file

@ -498,7 +498,6 @@ impl Timer {
#[derive(Debug)] #[derive(Debug)]
struct Bucket { struct Bucket {
lvl: u32, lvl: u32,
offs: u32,
bit: u64, bit: u64,
bit_n: u64, bit_n: u64,
entries: Slab<usize>, entries: Slab<usize>,
@ -516,7 +515,6 @@ impl Bucket {
Bucket { Bucket {
bit, bit,
lvl: lvl as u32, lvl: lvl as u32,
offs: offs as u32,
bit_n: !bit, bit_n: !bit,
entries: Slab::default(), entries: Slab::default(),
} }

View file

@ -2,7 +2,7 @@ use std::task::{Context, Poll};
use std::{cell::RefCell, future::Future, marker::PhantomData, pin::Pin, rc::Rc}; use std::{cell::RefCell, future::Future, marker::PhantomData, pin::Pin, rc::Rc};
use crate::http::{Request, Response}; use crate::http::{Request, Response};
use crate::router::{Path, ResourceDef, ResourceInfo, Router}; use crate::router::{Path, ResourceDef, Router};
use crate::service::boxed::{self, BoxService, BoxServiceFactory}; use crate::service::boxed::{self, BoxService, BoxServiceFactory};
use crate::service::{fn_service, PipelineFactory, Service, ServiceFactory, Transform}; use crate::service::{fn_service, PipelineFactory, Service, ServiceFactory, Transform};
use crate::util::Extensions; use crate::util::Extensions;
@ -141,7 +141,6 @@ where
} }
let routing = AppRouting { let routing = AppRouting {
ready: None,
router: router.finish(), router: router.finish(),
default: Some(default_fut.await?), default: Some(default_fut.await?),
}; };
@ -259,7 +258,6 @@ where
struct AppRouting<Err: ErrorRenderer> { struct AppRouting<Err: ErrorRenderer> {
router: Router<HttpService<Err>, Guards>, router: Router<HttpService<Err>, Guards>,
ready: Option<(WebRequest<Err>, ResourceInfo)>,
default: Option<HttpService<Err>>, default: Option<HttpService<Err>>,
} }
@ -271,11 +269,7 @@ impl<Err: ErrorRenderer> Service for AppRouting<Err> {
#[inline] #[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.ready.is_none() {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} else {
Poll::Pending
}
} }
fn call(&self, mut req: WebRequest<Err>) -> Self::Future { fn call(&self, mut req: WebRequest<Err>) -> Self::Future {

View file

@ -3,7 +3,7 @@ use std::{
}; };
use crate::http::Response; use crate::http::Response;
use crate::router::{IntoPattern, ResourceDef, ResourceInfo, Router}; use crate::router::{IntoPattern, ResourceDef, Router};
use crate::service::boxed::{self, BoxService, BoxServiceFactory}; use crate::service::boxed::{self, BoxService, BoxServiceFactory};
use crate::service::{pipeline_factory, PipelineFactory}; use crate::service::{pipeline_factory, PipelineFactory};
use crate::service::{Identity, IntoServiceFactory, Service, ServiceFactory, Transform}; use crate::service::{Identity, IntoServiceFactory, Service, ServiceFactory, Transform};
@ -635,7 +635,6 @@ impl<Err: ErrorRenderer> ServiceFactory for ScopeRouterFactory<Err> {
data, data,
default, default,
router: router.finish(), router: router.finish(),
_ready: None,
}) })
}) })
} }
@ -645,7 +644,6 @@ struct ScopeRouter<Err: ErrorRenderer> {
data: Option<Rc<Extensions>>, data: Option<Rc<Extensions>>,
router: Router<HttpService<Err>, Vec<Box<dyn Guard>>>, router: Router<HttpService<Err>, Vec<Box<dyn Guard>>>,
default: Option<HttpService<Err>>, default: Option<HttpService<Err>>,
_ready: Option<(WebRequest<Err>, ResourceInfo)>,
} }
impl<Err: ErrorRenderer> Service for ScopeRouter<Err> { impl<Err: ErrorRenderer> Service for ScopeRouter<Err> {

View file

@ -13,8 +13,8 @@ use crate::http::{
use crate::server::{Server, ServerBuilder}; use crate::server::{Server, ServerBuilder};
#[cfg(unix)] #[cfg(unix)]
use crate::service::pipeline_factory; use crate::service::pipeline_factory;
use crate::time::Seconds;
use crate::{service::map_config, IntoServiceFactory, Service, ServiceFactory}; use crate::{service::map_config, IntoServiceFactory, Service, ServiceFactory};
use crate::{time::Seconds, util::PoolId};
use super::config::AppConfig; use super::config::AppConfig;
@ -24,9 +24,7 @@ struct Config {
client_timeout: Seconds, client_timeout: Seconds,
client_disconnect: Seconds, client_disconnect: Seconds,
handshake_timeout: Seconds, handshake_timeout: Seconds,
lw: u16, pool: PoolId,
read_hw: u16,
write_hw: u16,
} }
/// An HTTP Server. /// An HTTP Server.
@ -86,9 +84,7 @@ where
client_timeout: Seconds(5), client_timeout: Seconds(5),
client_disconnect: Seconds(5), client_disconnect: Seconds(5),
handshake_timeout: Seconds(5), handshake_timeout: Seconds(5),
lw: 1024, pool: PoolId::P1,
read_hw: 8 * 1024,
write_hw: 8 * 1024,
})), })),
backlog: 1024, backlog: 1024,
builder: ServerBuilder::default(), builder: ServerBuilder::default(),
@ -229,22 +225,27 @@ where
self self
} }
/// Set memory pool.
///
/// Use specified memory pool for memory allocations. By default P1
/// memory pool is used.
pub fn memory_pool(self, id: PoolId) -> Self {
self.config.lock().unwrap().pool = id;
self
}
#[doc(hidden)]
#[deprecated(since = "0.4.12", note = "Use memory pool config")]
#[inline] #[inline]
/// Set read/write buffer params /// Set read/write buffer params
/// ///
/// By default read buffer is 8kb, write buffer is 8kb /// By default read buffer is 8kb, write buffer is 8kb
pub fn buffer_params( pub fn buffer_params(
self, self,
max_read_buf_size: u16, _max_read_buf_size: u16,
max_write_buf_size: u16, _max_write_buf_size: u16,
min_buf_size: u16, _min_buf_size: u16,
) -> Self { ) -> Self {
{
let mut cfg = self.config.lock().unwrap();
cfg.read_hw = max_read_buf_size;
cfg.write_hw = max_write_buf_size;
cfg.lw = min_buf_size;
}
self self
} }
@ -272,7 +273,7 @@ where
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout) .client_timeout(c.client_timeout)
.disconnect_timeout(c.client_disconnect) .disconnect_timeout(c.client_disconnect)
.buffer_params(c.read_hw, c.write_hw, c.lw) .memory_pool(c.pool)
.finish(map_config(factory(), move |_| cfg.clone())) .finish(map_config(factory(), move |_| cfg.clone()))
.tcp() .tcp()
}, },
@ -317,7 +318,7 @@ where
.client_timeout(c.client_timeout) .client_timeout(c.client_timeout)
.disconnect_timeout(c.client_disconnect) .disconnect_timeout(c.client_disconnect)
.ssl_handshake_timeout(c.handshake_timeout) .ssl_handshake_timeout(c.handshake_timeout)
.buffer_params(c.read_hw, c.write_hw, c.lw) .memory_pool(c.pool)
.finish(map_config(factory(), move |_| cfg.clone())) .finish(map_config(factory(), move |_| cfg.clone()))
.openssl(acceptor.clone()) .openssl(acceptor.clone())
}, },
@ -362,7 +363,7 @@ where
.client_timeout(c.client_timeout) .client_timeout(c.client_timeout)
.disconnect_timeout(c.client_disconnect) .disconnect_timeout(c.client_disconnect)
.ssl_handshake_timeout(c.handshake_timeout) .ssl_handshake_timeout(c.handshake_timeout)
.buffer_params(c.read_hw, c.write_hw, c.lw) .memory_pool(c.pool)
.finish(map_config(factory(), move |_| cfg.clone())) .finish(map_config(factory(), move |_| cfg.clone()))
.rustls(config.clone()) .rustls(config.clone())
}, },
@ -485,7 +486,7 @@ where
HttpService::build() HttpService::build()
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout) .client_timeout(c.client_timeout)
.buffer_params(c.read_hw, c.write_hw, c.lw) .memory_pool(c.pool)
.finish(map_config(factory(), move |_| config.clone())), .finish(map_config(factory(), move |_| config.clone())),
) )
})?; })?;
@ -526,7 +527,7 @@ where
HttpService::build() HttpService::build()
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout) .client_timeout(c.client_timeout)
.buffer_params(c.read_hw, c.write_hw, c.lw) .memory_pool(c.pool)
.finish(map_config(factory(), move |_| config.clone())), .finish(map_config(factory(), move |_| config.clone())),
) )
}, },