Add timer service

This commit is contained in:
Nikolay Kim 2021-08-27 22:07:44 +06:00
parent 0c9edb0fa3
commit 73c9672fee
43 changed files with 1015 additions and 365 deletions

View file

@ -1,6 +1,6 @@
# Changes
## [0.3.0] - 2021-08-26
## [0.3.0] - 2021-08-27
* Do not use/re-export tokio::time::Instant

View file

@ -6,6 +6,8 @@ mod arbiter;
mod builder;
mod runtime;
mod system;
#[doc(hidden)]
pub mod time;
pub use self::arbiter::Arbiter;

View file

@ -1,9 +1,11 @@
# Changes
## [0.4.0-b.3] - 2021-08-xx
## [0.4.0-b.3] - 2021-08-27
* Add timer service
* Use ntex-rt 0.3
* Use ntex-service 0.2
## [0.4.0-b.2] - 2021-08-14

View file

@ -509,8 +509,8 @@ mod tests {
use std::sync::{atomic::AtomicBool, atomic::Ordering::Relaxed, Arc, Mutex};
use crate::codec::BytesCodec;
use crate::rt::time::sleep;
use crate::testing::Io;
use crate::time::sleep;
use crate::util::Bytes;
use super::*;
@ -578,7 +578,7 @@ mod tests {
server,
BytesCodec,
crate::fn_service(|msg: DispatchItem<BytesCodec>| async move {
sleep(Duration::from_millis(50)).await;
sleep(50).await;
if let DispatchItem::Item(msg) = msg {
Ok::<_, ()>(Some(msg.freeze()))
} else {
@ -629,7 +629,7 @@ mod tests {
assert_eq!(buf, Bytes::from_static(b"test"));
st.close();
sleep(Duration::from_millis(200)).await;
sleep(200).await;
assert!(client.is_server_dropped());
}
@ -716,7 +716,7 @@ mod tests {
let buf = client.read_any();
assert_eq!(buf, Bytes::from_static(b""));
client.write("GET /test HTTP/1\r\n\r\n");
sleep(Duration::from_millis(25)).await;
sleep(25).await;
// buf must be consumed
assert_eq!(client.remote_buffer(|buf| buf.len()), 0);
@ -726,11 +726,11 @@ mod tests {
assert_eq!(state.write().with_buf(|buf| buf.len()), 65536);
client.remote_buffer_cap(10240);
sleep(Duration::from_millis(50)).await;
sleep(50).await;
assert_eq!(state.write().with_buf(|buf| buf.len()), 55296);
client.remote_buffer_cap(45056);
sleep(Duration::from_millis(50)).await;
sleep(50).await;
assert_eq!(state.write().with_buf(|buf| buf.len()), 10240);
// backpressure disabled
@ -776,7 +776,7 @@ mod tests {
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
sleep(Duration::from_millis(3100)).await;
sleep(3100).await;
// write side must be closed, dispatcher should fail with keep-alive
let flags = state.flags();
@ -802,7 +802,7 @@ mod tests {
crate::fn_service(move |msg: DispatchItem<BytesCodec>| {
handled2.store(true, Relaxed);
async move {
sleep(Duration::from_millis(50)).await;
sleep(50).await;
if let DispatchItem::Item(msg) = msg {
Ok::<_, ()>(Some(msg.freeze()))
} else {
@ -815,7 +815,7 @@ mod tests {
crate::rt::spawn(async move {
let _ = disp.await;
});
sleep(Duration::from_millis(50)).await;
sleep(50).await;
assert!(handled.load(Relaxed));
}

View file

@ -1,19 +1,19 @@
use std::{cell::RefCell, collections::BTreeMap, rc::Rc, time::Duration, time::Instant};
use std::{cell::RefCell, collections::BTreeMap, rc::Rc, time::Instant};
use crate::framed::State;
use crate::rt::time::sleep;
use crate::time::sleep;
use crate::util::HashSet;
pub struct Timer(Rc<RefCell<Inner>>);
struct Inner {
resolution: Duration,
resolution: u64,
current: Option<Instant>,
notifications: BTreeMap<Instant, HashSet<State>>,
}
impl Inner {
fn new(resolution: Duration) -> Self {
fn new(resolution: u64) -> Self {
Inner {
resolution,
current: None,
@ -39,12 +39,13 @@ impl Clone for Timer {
impl Default for Timer {
fn default() -> Self {
Timer::with(Duration::from_secs(1))
Timer::with(1_000)
}
}
impl Timer {
pub fn with(resolution: Duration) -> Timer {
/// Create new timer with resolution in milliseconds
pub fn with(resolution: u64) -> Timer {
Timer(Rc::new(RefCell::new(Inner::new(resolution))))
}

View file

@ -1,14 +1,14 @@
use std::task::{Context, Poll};
use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, time::Duration};
use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc};
use crate::codec::{AsyncRead, AsyncWrite, ReadBuf};
use crate::framed::State;
use crate::rt::time::{sleep, Sleep};
use crate::time::{sleep, Sleep};
#[derive(Debug)]
enum IoWriteState {
Processing,
Shutdown(Option<Pin<Box<Sleep>>>, Shutdown),
Shutdown(Option<Sleep>, Shutdown),
}
#[derive(Debug)]
@ -46,7 +46,7 @@ where
let disconnect_timeout = state.get_disconnect_timeout() as u64;
let st = IoWriteState::Shutdown(
if disconnect_timeout != 0 {
Some(Box::pin(sleep(Duration::from_millis(disconnect_timeout))))
Some(sleep(disconnect_timeout))
} else {
None
},
@ -83,9 +83,7 @@ where
let disconnect_timeout = this.state.get_disconnect_timeout() as u64;
this.st = IoWriteState::Shutdown(
if disconnect_timeout != 0 {
Some(Box::pin(sleep(Duration::from_millis(
disconnect_timeout,
))))
Some(sleep(disconnect_timeout))
} else {
None
},

View file

@ -1,7 +1,4 @@
use std::convert::TryFrom;
use std::fmt;
use std::rc::Rc;
use std::time::Duration;
use std::{convert::TryFrom, fmt, rc::Rc};
use crate::http::error::HttpError;
use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue};
@ -36,7 +33,7 @@ impl ClientBuilder {
max_redirects: 10,
config: ClientConfig {
headers: HeaderMap::new(),
timeout: Some(Duration::from_secs(5)),
timeout: 5_000,
connector: Box::new(ConnectorWrapper(Connector::default().finish())),
},
}
@ -54,18 +51,18 @@ impl ClientBuilder {
self
}
/// Set request timeout
/// Set request timeout in seconds
///
/// Request timeout is the total time before a response must be received.
/// Default value is 5 seconds.
pub fn timeout(mut self, timeout: Duration) -> Self {
self.config.timeout = Some(timeout);
pub fn timeout(mut self, timeout: u16) -> Self {
self.config.timeout = ((timeout as u64) * 1000) as u64;
self
}
/// Disable request timeout.
pub fn disable_timeout(mut self) -> Self {
self.config.timeout = None;
self.config.timeout = 0;
self
}

View file

@ -33,14 +33,14 @@ type BoxedConnector =
/// use ntex::http::client::Connector;
///
/// let connector = Connector::default()
/// .timeout(Duration::from_secs(5))
/// .timeout(5_000)
/// .finish();
/// ```
pub struct Connector {
timeout: Duration,
timeout: u64,
conn_lifetime: Duration,
conn_keep_alive: Duration,
disconnect_timeout: Duration,
disconnect_timeout_ms: u64,
limit: usize,
connector: BoxedConnector,
ssl_connector: Option<BoxedConnector>,
@ -64,10 +64,10 @@ impl Connector {
.map_err(ConnectError::from),
),
ssl_connector: None,
timeout: Duration::from_secs(1),
timeout: 1_000,
conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15),
disconnect_timeout: Duration::from_millis(3000),
disconnect_timeout_ms: 3_000,
limit: 100,
};
@ -99,9 +99,11 @@ impl Connector {
}
impl Connector {
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
/// Connection timeout in milliseconds.
///
/// i.e. max time to connect to remote host including dns name resolution.
/// Set to 1 second by default.
pub fn timeout(mut self, timeout: Duration) -> Self {
pub fn timeout(mut self, timeout: u64) -> Self {
self.timeout = timeout;
self
}
@ -177,7 +179,7 @@ impl Connector {
self
}
/// Set server connection disconnect timeout.
/// Set server connection disconnect timeout in milliseconds.
///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
/// within this time, the socket get dropped. This timeout affects only secure connections.
@ -185,8 +187,8 @@ impl Connector {
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 3000 milliseconds.
pub fn disconnect_timeout(mut self, dur: Duration) -> Self {
self.disconnect_timeout = dur;
pub fn disconnect_timeout(mut self, dur: u64) -> Self {
self.disconnect_timeout_ms = dur;
self
}
@ -241,7 +243,7 @@ impl Connector {
srv,
self.conn_lifetime,
self.conn_keep_alive,
self.disconnect_timeout,
self.disconnect_timeout_ms,
self.limit,
))
} else {
@ -253,7 +255,7 @@ impl Connector {
tcp_service,
self.conn_lifetime,
self.conn_keep_alive,
self.disconnect_timeout,
self.disconnect_timeout_ms,
self.limit,
),
ssl_pool,
@ -263,14 +265,14 @@ impl Connector {
fn connector(
connector: BoxedConnector,
timeout: Duration,
timeout: u64,
) -> impl Service<
Request = Connect,
Response = (Box<dyn Io>, Protocol),
Error = ConnectError,
Future = impl Unpin,
> + Unpin {
TimeoutService::new(
TimeoutService::from_millis(
timeout,
apply_fn(connector, |msg: Connect, srv| {
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))

View file

@ -1,4 +1,4 @@
use std::{convert::TryFrom, error::Error, fmt, net, rc::Rc, time::Duration};
use std::{convert::TryFrom, error::Error, fmt, net, rc::Rc};
use crate::http::body::Body;
use crate::http::error::HttpError;
@ -16,7 +16,7 @@ pub struct FrozenClientRequest {
pub(super) head: Rc<RequestHead>,
pub(super) addr: Option<net::SocketAddr>,
pub(super) response_decompress: bool,
pub(super) timeout: Option<Duration>,
pub(super) timeout: u64,
pub(super) config: Rc<ClientConfig>,
}

View file

@ -16,7 +16,7 @@
//! println!("Response: {:?}", response);
//! }
//! ```
use std::{convert::TryFrom, rc::Rc, time::Duration};
use std::{convert::TryFrom, rc::Rc};
mod builder;
mod connect;
@ -77,7 +77,7 @@ pub struct Client(Rc<ClientConfig>);
pub(self) struct ClientConfig {
pub(self) connector: Box<dyn InnerConnect>,
pub(self) headers: HeaderMap,
pub(self) timeout: Option<Duration>,
pub(self) timeout: u64,
}
impl Default for Client {
@ -85,7 +85,7 @@ impl Default for Client {
Client(Rc::new(ClientConfig {
connector: Box::new(ConnectorWrapper(Connector::default().finish())),
headers: HeaderMap::new(),
timeout: Some(Duration::from_secs(5)),
timeout: 5_000,
}))
}
}

View file

@ -8,9 +8,10 @@ use http::uri::Authority;
use crate::channel::pool;
use crate::codec::{AsyncRead, AsyncWrite, ReadBuf};
use crate::http::Protocol;
use crate::rt::{spawn, time::sleep, time::Sleep};
use crate::rt::spawn;
use crate::service::Service;
use crate::task::LocalWaker;
use crate::time::{sleep, Sleep};
use crate::util::{poll_fn, Bytes, HashMap};
use super::connection::{ConnectionType, IoConnection};
@ -30,7 +31,6 @@ impl From<Authority> for Key {
type Waiter<Io> = pool::Sender<Result<IoConnection<Io>, ConnectError>>;
type WaiterReceiver<Io> = pool::Receiver<Result<IoConnection<Io>, ConnectError>>;
const ZERO: Duration = Duration::from_millis(0);
/// Connections pool
pub(super) struct ConnectionPool<T, Io: 'static>(Rc<T>, Rc<RefCell<Inner<Io>>>);
@ -47,14 +47,14 @@ where
connector: T,
conn_lifetime: Duration,
conn_keep_alive: Duration,
disconnect_timeout: Duration,
disconnect_timeout_ms: u64,
limit: usize,
) -> Self {
let connector = Rc::new(connector);
let inner = Rc::new(RefCell::new(Inner {
conn_lifetime,
conn_keep_alive,
disconnect_timeout,
disconnect_timeout_ms,
limit,
acquired: 0,
waiters: VecDeque::new(),
@ -180,7 +180,7 @@ struct AvailableConnection<Io> {
pub(super) struct Inner<Io> {
conn_lifetime: Duration,
conn_keep_alive: Duration,
disconnect_timeout: Duration,
disconnect_timeout_ms: u64,
limit: usize,
acquired: usize,
available: HashMap<Key, VecDeque<AvailableConnection<Io>>>,
@ -246,7 +246,7 @@ where
|| (now - conn.created) > self.conn_lifetime
{
if let ConnectionType::H1(io) = conn.io {
CloseConnection::spawn(io, self.disconnect_timeout);
CloseConnection::spawn(io, self.disconnect_timeout_ms);
}
} else {
let mut io = conn.io;
@ -257,7 +257,10 @@ where
Poll::Pending => (),
Poll::Ready(Ok(_)) if !read_buf.filled().is_empty() => {
if let ConnectionType::H1(io) = io {
CloseConnection::spawn(io, self.disconnect_timeout);
CloseConnection::spawn(
io,
self.disconnect_timeout_ms,
);
}
continue;
}
@ -287,7 +290,7 @@ where
fn release_close(&mut self, io: ConnectionType<Io>) {
self.acquired -= 1;
if let ConnectionType::H1(io) = io {
CloseConnection::spawn(io, self.disconnect_timeout);
CloseConnection::spawn(io, self.disconnect_timeout_ms);
}
self.check_availibility();
}
@ -363,22 +366,19 @@ where
}
}
pin_project_lite::pin_project! {
struct CloseConnection<T> {
io: T,
#[pin]
timeout: Option<Sleep>,
shutdown: bool,
}
struct CloseConnection<T> {
io: T,
timeout: Option<Sleep>,
shutdown: bool,
}
impl<T> CloseConnection<T>
where
T: AsyncWrite + AsyncRead + Unpin + 'static,
{
fn spawn(io: T, timeout: Duration) {
let timeout = if timeout != ZERO {
Some(sleep(timeout))
fn spawn(io: T, timeout_ms: u64) {
let timeout = if timeout_ms != 0 {
Some(sleep(timeout_ms))
} else {
None
};
@ -396,19 +396,19 @@ where
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.as_mut();
// shutdown WRITE side
match Pin::new(&mut this.io).poll_shutdown(cx) {
Poll::Ready(Ok(())) => *this.shutdown = true,
Poll::Ready(Ok(())) => this.shutdown = true,
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(_)) => return Poll::Ready(()),
}
// read until 0 or err
if let Some(timeout) = this.timeout.as_pin_mut() {
match timeout.poll(cx) {
if let Some(ref timeout) = this.timeout {
match timeout.poll_elapsed(cx) {
Poll::Ready(_) => (),
Poll::Pending => {
let mut buf = [0u8; 512];
@ -611,7 +611,7 @@ mod tests {
use std::{cell::RefCell, convert::TryFrom, rc::Rc, time::Duration};
use super::*;
use crate::rt::time::sleep;
use crate::time::sleep;
use crate::{
http::client::Connection, http::Uri, service::fn_service, testing::Io,
util::lazy,
@ -630,7 +630,7 @@ mod tests {
}),
Duration::from_secs(10),
Duration::from_secs(10),
Duration::from_millis(0),
0,
1,
)
.clone();
@ -671,7 +671,7 @@ mod tests {
let mut fut = pool.call(req.clone());
assert!(lazy(|cx| Pin::new(&mut fut).poll(cx)).await.is_pending());
drop(fut);
sleep(Duration::from_millis(50)).await;
sleep(50).await;
pool.1.borrow_mut().check_availibility();
assert!(pool.1.borrow().waiters.is_empty());

View file

@ -1,4 +1,4 @@
use std::{convert::TryFrom, error::Error, fmt, net, rc::Rc, time::Duration};
use std::{convert::TryFrom, error::Error, fmt, net, rc::Rc};
#[cfg(feature = "cookie")]
use coo_kie::{Cookie, CookieJar};
@ -51,7 +51,7 @@ pub struct ClientRequest {
#[cfg(feature = "cookie")]
cookies: Option<CookieJar>,
response_decompress: bool,
timeout: Option<Duration>,
timeout: u64,
config: Rc<ClientConfig>,
}
@ -69,7 +69,7 @@ impl ClientRequest {
addr: None,
#[cfg(feature = "cookie")]
cookies: None,
timeout: None,
timeout: 0,
response_decompress: true,
}
.method(method)
@ -309,12 +309,12 @@ impl ClientRequest {
self
}
/// Set request timeout. Overrides client wide timeout setting.
/// Set request timeout in secs. Overrides client wide timeout setting.
///
/// Request timeout is the total time before a response must be received.
/// Default value is 5 seconds.
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
pub fn timeout(mut self, timeout: u16) -> Self {
self.timeout = ((timeout as u64) * 1000) as u64;
self
}

View file

@ -1,5 +1,5 @@
use std::task::{Context, Poll};
use std::{convert::TryFrom, error::Error, future::Future, net, pin::Pin, time};
use std::{convert::TryFrom, error::Error, future::Future, net, pin::Pin};
use serde::Serialize;
@ -7,7 +7,7 @@ use crate::http::body::{Body, BodyStream};
use crate::http::error::HttpError;
use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue};
use crate::http::RequestHeadType;
use crate::rt::time::{sleep, Sleep};
use crate::time::{sleep, Sleep};
use crate::{util::Bytes, Stream};
#[cfg(feature = "compress")]
@ -48,7 +48,7 @@ impl From<PrepForSendingError> for SendRequestError {
pub enum SendClientRequest {
Fut(
Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>,
Option<Pin<Box<Sleep>>>,
Option<Sleep>,
bool,
),
Err(Option<SendRequestError>),
@ -58,9 +58,13 @@ impl SendClientRequest {
pub(crate) fn new(
send: Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>,
response_decompress: bool,
timeout: Option<time::Duration>,
timeout: u64,
) -> SendClientRequest {
let delay = timeout.map(|d| Box::pin(sleep(d)));
let delay = if timeout != 0 {
Some(sleep(timeout))
} else {
None
};
SendClientRequest::Fut(send, delay, response_decompress)
}
}
@ -74,7 +78,7 @@ impl Future for SendClientRequest {
match this {
SendClientRequest::Fut(send, delay, _response_decompress) => {
if delay.is_some() {
match Pin::new(delay.as_mut().unwrap()).poll(cx) {
match delay.as_ref().unwrap().poll_elapsed(cx) {
Poll::Pending => (),
_ => return Poll::Ready(Err(SendRequestError::Timeout)),
}
@ -130,17 +134,21 @@ impl RequestHeadType {
self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: Option<time::Duration>,
mut timeout: u64,
config: &ClientConfig,
body: B,
) -> SendClientRequest
where
B: Into<Body>,
{
if timeout == 0 {
timeout = config.timeout;
}
SendClientRequest::new(
config.connector.send_request(self, body.into(), addr),
response_decompress,
timeout.or(config.timeout),
timeout,
)
}
@ -148,7 +156,7 @@ impl RequestHeadType {
mut self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: Option<time::Duration>,
timeout: u64,
config: &ClientConfig,
value: &T,
) -> SendClientRequest {
@ -175,7 +183,7 @@ impl RequestHeadType {
mut self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: Option<time::Duration>,
timeout: u64,
config: &ClientConfig,
value: &T,
) -> SendClientRequest {
@ -205,7 +213,7 @@ impl RequestHeadType {
self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: Option<time::Duration>,
timeout: u64,
config: &ClientConfig,
stream: S,
) -> SendClientRequest
@ -226,7 +234,7 @@ impl RequestHeadType {
self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: Option<time::Duration>,
timeout: u64,
config: &ClientConfig,
) -> SendClientRequest {
self.send_body(addr, response_decompress, timeout, config, Body::Empty)

View file

@ -12,7 +12,7 @@ use crate::http::header::{self, HeaderName, HeaderValue, AUTHORIZATION};
use crate::http::{ConnectionType, Payload, RequestHead, StatusCode, Uri};
use crate::service::{apply_fn, into_service, IntoService, Service};
use crate::util::Either;
use crate::{channel::mpsc, rt, rt::time::timeout, util::sink, util::Ready, ws};
use crate::{channel::mpsc, rt, time::timeout, util::sink, util::Ready, ws};
pub use crate::ws::{CloseCode, CloseReason, Frame, Message};
@ -311,8 +311,8 @@ impl WsRequest {
let fut = self.config.connector.open_tunnel(head.into(), self.addr);
// set request timeout
let (head, framed) = if let Some(to) = self.config.timeout {
timeout(to, fut)
let (head, framed) = if self.config.timeout > 0 {
timeout(self.config.timeout, fut)
.await
.map_err(|_| SendRequestError::Timeout)
.and_then(|res| res)?

View file

@ -2,8 +2,8 @@ use std::{cell::Cell, cell::RefCell, ptr::copy_nonoverlapping, rc::Rc, time};
use crate::framed::Timer;
use crate::http::{Request, Response};
use crate::rt::time::{sleep, sleep_until, Sleep};
use crate::service::boxed::BoxService;
use crate::time::{sleep, Sleep};
use crate::util::BytesMut;
#[derive(Debug, PartialEq, Clone, Copy)]
@ -104,7 +104,7 @@ pub(super) struct DispatcherConfig<T, S, X, U> {
pub(super) service: S,
pub(super) expect: X,
pub(super) upgrade: Option<U>,
pub(super) keep_alive: time::Duration,
pub(super) keep_alive: u64,
pub(super) client_timeout: u64,
pub(super) client_disconnect: u64,
pub(super) ka_enabled: bool,
@ -129,7 +129,7 @@ impl<T, S, X, U> DispatcherConfig<T, S, X, U> {
expect,
upgrade,
on_request,
keep_alive: time::Duration::from_secs(cfg.0.keep_alive),
keep_alive: cfg.0.keep_alive * 1000,
client_timeout: cfg.0.client_timeout,
client_disconnect: cfg.0.client_disconnect,
ka_enabled: cfg.0.ka_enabled,
@ -148,8 +148,8 @@ impl<T, S, X, U> DispatcherConfig<T, S, X, U> {
/// Return keep-alive timer Sleep is configured.
pub(super) fn keep_alive_timer(&self) -> Option<Sleep> {
if self.keep_alive.as_secs() != 0 {
Some(sleep_until(self.timer.now() + self.keep_alive))
if self.keep_alive != 0 {
Some(sleep(self.keep_alive))
} else {
None
}
@ -157,8 +157,8 @@ impl<T, S, X, U> DispatcherConfig<T, S, X, U> {
/// Keep-alive expire time
pub(super) fn keep_alive_expire(&self) -> Option<time::Instant> {
if self.keep_alive.as_secs() != 0 {
Some(self.timer.now() + self.keep_alive)
if self.keep_alive != 0 {
Some(self.timer.now() + time::Duration::from_millis(self.keep_alive))
} else {
None
}
@ -223,13 +223,13 @@ impl DateService {
// periodic date update
let s = self.clone();
crate::rt::spawn(async move {
sleep(time::Duration::from_millis(500)).await;
sleep(500).await;
s.0.current.set(false);
});
}
}
fn now(&self) -> time::Instant {
pub(super) fn now(&self) -> time::Instant {
self.check_date();
self.0.current_time.get()
}

View file

@ -536,9 +536,9 @@ where
fn reset_keepalive(&mut self) {
// re-register keep-alive
if self.flags.contains(Flags::KEEPALIVE) && self.config.keep_alive.as_secs() != 0
{
let expire = self.config.timer_h1.now() + self.config.keep_alive;
if self.flags.contains(Flags::KEEPALIVE) && self.config.keep_alive != 0 {
let expire = self.config.timer_h1.now()
+ time::Duration::from_millis(self.config.keep_alive);
if expire != self.expire {
self.config
.timer_h1
@ -740,7 +740,7 @@ mod tests {
use crate::http::{body, Request, ResponseHead, StatusCode};
use crate::service::{boxed, fn_service, IntoService};
use crate::util::{lazy, next, Bytes, BytesMut};
use crate::{codec::Decoder, rt::time::sleep, testing::Io};
use crate::{codec::Decoder, testing::Io, time::sleep};
const BUFFER_SIZE: usize = 32_768;
@ -823,10 +823,10 @@ mod tests {
None,
None,
);
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
client.local_buffer(|buf| assert_eq!(&buf[..15], b"HTTP/1.0 200 OK"));
client.close().await;
@ -842,11 +842,11 @@ mod tests {
let mut h1 = h1(server, |_| {
Box::pin(async { Ok::<_, io::Error>(Response::Ok().finish()) })
});
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
assert!(!h1.inner.state.is_open());
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
client
.local_buffer(|buf| assert_eq!(&buf[..26], b"HTTP/1.1 400 Bad Request\r\n"));
@ -897,7 +897,7 @@ mod tests {
});
client.write("GET /test HTTP/1.1\r\ncontent-length: 5\r\n\r\n");
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
client.write("xxxxx");
let mut buf = client.read().await.unwrap();
@ -921,7 +921,7 @@ mod tests {
client.remote_buffer_cap(4096);
let mut decoder = ClientCodec::default();
spawn_h1(server, |_| async {
sleep(time::Duration::from_millis(100)).await;
sleep(100).await;
Ok::<_, io::Error>(Response::Ok().finish())
});
@ -933,7 +933,7 @@ mod tests {
client.write("GET /test HTTP/1.1\r\n\r\n");
client.write("GET /test HTTP/1.1\r\n\r\n");
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
client.write("GET /test HTTP/1.1\r\n\r\n");
let mut buf = client.read().await.unwrap();
@ -998,10 +998,10 @@ mod tests {
.collect::<String>();
client.write("GET /test HTTP/1.1\r\nContent-Length: ");
client.write(data);
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
assert!(!h1.inner.state.is_open());
@ -1024,13 +1024,13 @@ mod tests {
let _ = next(&mut pl).await.unwrap().unwrap();
m.store(true, Ordering::Relaxed);
// sleep
sleep(time::Duration::from_secs(999_999)).await;
sleep(999_999_000).await;
Ok::<_, io::Error>(Response::Ok().finish())
}
});
client.write("GET /test HTTP/1.1\r\nContent-Length: 1048576\r\n\r\n");
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
// buf must be consumed
assert_eq!(client.remote_buffer(|buf| buf.len()), 0);
@ -1040,7 +1040,7 @@ mod tests {
(0..1_048_576).map(|_| rand::random::<u8>()).collect();
client.write(random_bytes);
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
assert!(client.remote_buffer(|buf| buf.len()) > 1_048_576 - BUFFER_SIZE * 3);
assert!(mark.load(Ordering::Relaxed));
}
@ -1083,7 +1083,7 @@ mod tests {
// do not allow to write to socket
client.remote_buffer_cap(0);
client.write("GET /test HTTP/1.1\r\n\r\n");
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
// buf must be consumed
@ -1096,7 +1096,7 @@ mod tests {
assert_eq!(state.write().with_buf(|buf| buf.len()), 65629);
client.remote_buffer_cap(65536);
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
assert_eq!(state.write().with_buf(|buf| buf.len()), 93);
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
@ -1138,7 +1138,7 @@ mod tests {
});
client.write("GET /test HTTP/1.1\r\n\r\n");
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
// http message must be consumed
@ -1150,7 +1150,7 @@ mod tests {
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
client.close().await;
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
}
@ -1165,10 +1165,10 @@ mod tests {
Err::<Response<()>, _>(io::Error::new(io::ErrorKind::Other, "error"))
})
});
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
assert!(h1.inner.state.is_io_err());
let buf = client.local_buffer(|buf| buf.split().freeze());
assert_eq!(&buf[..28], b"HTTP/1.1 500 Internal Server");

View file

@ -17,7 +17,7 @@ use crate::http::message::ResponseHead;
use crate::http::payload::Payload;
use crate::http::request::Request;
use crate::http::response::Response;
use crate::rt::time::Sleep;
use crate::time::Sleep;
use crate::util::{Bytes, BytesMut};
use crate::Service;
@ -53,9 +53,13 @@ where
) -> Self {
// keep-alive timer
let (ka_expire, ka_timer) = if let Some(delay) = timeout {
(delay.deadline(), Some(delay))
let expire =
config.timer.now() + time::Duration::from_millis(config.keep_alive);
(expire, Some(delay))
} else if let Some(delay) = config.keep_alive_timer() {
(delay.deadline(), Some(delay))
let expire =
config.timer.now() + time::Duration::from_millis(config.keep_alive);
(expire, Some(delay))
} else {
(config.now(), None)
};

View file

@ -1,5 +1,5 @@
//! Test helpers to use during testing.
use std::{convert::TryFrom, io, net, str::FromStr, sync::mpsc, thread, time};
use std::{convert::TryFrom, io, net, str::FromStr, sync::mpsc, thread};
#[cfg(feature = "cookie")]
use coo_kie::{Cookie, CookieJar};
@ -243,22 +243,17 @@ pub fn server<F: StreamServiceFactory<TcpStream>>(factory: F) -> TestServer {
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
Connector::default()
.timeout(time::Duration::from_millis(30000))
.timeout(30_000)
.openssl(builder.build())
.finish()
}
#[cfg(not(feature = "openssl"))]
{
Connector::default()
.timeout(time::Duration::from_millis(30000))
.finish()
Connector::default().timeout(30).finish()
}
};
Client::build()
.timeout(time::Duration::from_millis(30000))
.connector(connector)
.finish()
Client::build().timeout(30).connector(connector).finish()
};
TestServer {

View file

@ -5,8 +5,8 @@ use std::{
use log::{error, info};
use slab::Slab;
use crate::rt::time::sleep_until;
use crate::rt::System;
use crate::time::sleep;
use super::socket::{Listener, SocketAddr};
use super::worker::{Connection, WorkerClient};
@ -15,7 +15,7 @@ use super::{Server, ServerStatus, Token};
const DELTA: usize = 100;
const NOTIFY: mio::Token = mio::Token(0);
const ERR_TIMEOUT: Duration = Duration::from_millis(500);
const ERR_SLEEP_TIMEOUT: Duration = Duration::from_millis(525);
const ERR_SLEEP_TIMEOUT: u64 = 525;
#[derive(Debug)]
pub(super) enum Command {
@ -459,7 +459,7 @@ impl Accept {
let notify = self.notify.clone();
System::current().arbiter().spawn(Box::pin(async move {
sleep_until(Instant::now() + ERR_SLEEP_TIMEOUT).await;
sleep(ERR_SLEEP_TIMEOUT).await;
notify.send(Command::Timer);
}));
return;

View file

@ -1,5 +1,5 @@
use std::task::{Context, Poll};
use std::{future::Future, io, mem, net, pin::Pin, time::Duration};
use std::{future::Future, io, mem, net, pin::Pin};
use async_channel::{unbounded, Receiver};
use async_oneshot as oneshot;
@ -7,8 +7,8 @@ use futures_core::Stream;
use log::{error, info};
use socket2::{Domain, SockAddr, Socket, Type};
use crate::rt::{net::TcpStream, spawn, time::sleep, System};
use crate::util::join_all;
use crate::rt::{net::TcpStream, spawn, System};
use crate::{time::sleep, util::join_all};
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::config::{ConfiguredService, ServiceConfig};
@ -18,7 +18,7 @@ use super::socket::Listener;
use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
use super::{Server, ServerCommand, ServerStatus, Token};
const STOP_DELAY: Duration = Duration::from_millis(300);
const STOP_DELAY: u64 = 300;
/// Server builder
pub struct ServerBuilder {
@ -30,7 +30,7 @@ pub struct ServerBuilder {
sockets: Vec<(Token, String, Listener)>,
accept: AcceptLoop,
exit: bool,
shutdown_timeout: Duration,
shutdown_timeout: u64,
no_signals: bool,
cmd: Receiver<ServerCommand>,
server: Server,
@ -58,7 +58,7 @@ impl ServerBuilder {
accept: AcceptLoop::new(server.clone()),
backlog: 2048,
exit: false,
shutdown_timeout: Duration::from_secs(30),
shutdown_timeout: 30_000,
no_signals: false,
cmd: rx,
notify: Vec::new(),
@ -125,7 +125,7 @@ impl ServerBuilder {
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u64) -> Self {
self.shutdown_timeout = Duration::from_secs(sec);
self.shutdown_timeout = sec * 1000;
self
}
@ -521,8 +521,7 @@ mod tests {
#[cfg(unix)]
#[crate::rt_test]
async fn test_signals() {
use std::sync::mpsc;
use std::{net, thread, time};
use std::{net, sync::mpsc, thread};
fn start(tx: mpsc::Sender<(Server, net::SocketAddr)>) -> thread::JoinHandle<()> {
thread::spawn(move || {
@ -552,11 +551,11 @@ mod tests {
let h = start(tx);
let (srv, addr) = rx.recv().unwrap();
crate::rt::time::sleep(time::Duration::from_millis(300)).await;
crate::time::sleep(300).await;
assert!(net::TcpStream::connect(addr).is_ok());
srv.signal(*sig);
crate::rt::time::sleep(time::Duration::from_millis(300)).await;
crate::time::sleep(300).await;
assert!(net::TcpStream::connect(addr).is_err());
let _ = h.join();
}

View file

@ -1,23 +1,22 @@
use std::task::{Context, Poll};
use std::{error::Error, fmt, future::Future, io, marker, pin::Pin, time};
use std::{error::Error, fmt, future::Future, io, marker, pin::Pin};
pub use open_ssl::ssl::{self, AlpnError, Ssl, SslAcceptor, SslAcceptorBuilder};
pub use tokio_openssl::SslStream;
use crate::codec::{AsyncRead, AsyncWrite};
use crate::rt::time::{sleep, Sleep};
use crate::service::{Service, ServiceFactory};
use crate::util::counter::{Counter, CounterGuard};
use crate::util::Ready;
use crate::time::{sleep, Sleep};
use crate::util::{counter::Counter, counter::CounterGuard, Ready};
use super::{MAX_SSL_ACCEPT_COUNTER, ZERO};
use super::MAX_SSL_ACCEPT_COUNTER;
/// Support `TLS` server connections via openssl package
///
/// `openssl` feature enables `Acceptor` type
pub struct Acceptor<T: AsyncRead + AsyncWrite> {
acceptor: SslAcceptor,
timeout: time::Duration,
timeout: u64,
io: marker::PhantomData<T>,
}
@ -26,7 +25,7 @@ impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
pub fn new(acceptor: SslAcceptor) -> Self {
Acceptor {
acceptor,
timeout: time::Duration::from_secs(5),
timeout: 5_000,
io: marker::PhantomData,
}
}
@ -35,7 +34,7 @@ impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
///
/// Default is set to 5 seconds.
pub fn timeout(mut self, time: u64) -> Self {
self.timeout = time::Duration::from_millis(time);
self.timeout = time;
self
}
}
@ -77,7 +76,7 @@ where
pub struct AcceptorService<T> {
acceptor: SslAcceptor,
conns: Counter,
timeout: time::Duration,
timeout: u64,
io: marker::PhantomData<T>,
}
@ -106,7 +105,7 @@ where
AcceptorServiceResponse {
_guard: self.conns.get(),
io: None,
delay: if self.timeout == ZERO {
delay: if self.timeout == 0 {
None
} else {
Some(sleep(self.timeout))
@ -116,28 +115,25 @@ where
}
}
pin_project_lite::pin_project! {
pub struct AcceptorServiceResponse<T>
where
T: AsyncRead,
T: AsyncWrite,
{
io: Option<SslStream<T>>,
#[pin]
delay: Option<Sleep>,
io_factory: Option<Result<SslStream<T>, open_ssl::error::ErrorStack>>,
_guard: CounterGuard,
}
pub struct AcceptorServiceResponse<T>
where
T: AsyncRead,
T: AsyncWrite,
{
io: Option<SslStream<T>>,
delay: Option<Sleep>,
io_factory: Option<Result<SslStream<T>, open_ssl::error::ErrorStack>>,
_guard: CounterGuard,
}
impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
type Output = Result<SslStream<T>, Box<dyn Error>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
if let Some(delay) = this.delay.as_pin_mut() {
match delay.poll(cx) {
if let Some(ref delay) = this.delay {
match delay.poll_elapsed(cx) {
Poll::Pending => (),
Poll::Ready(_) => {
return Poll::Ready(Err(Box::new(io::Error::new(
@ -149,7 +145,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
}
match this.io_factory.take() {
Some(Ok(io)) => *this.io = Some(io),
Some(Ok(io)) => this.io = Some(io),
Some(Err(err)) => return Poll::Ready(Err(Box::new(err))),
None => (),
}

View file

@ -1,5 +1,5 @@
use std::task::{Context, Poll};
use std::{error::Error, future::Future, io, marker, pin::Pin, sync::Arc, time};
use std::{error::Error, future::Future, io, marker, pin::Pin, sync::Arc};
use tokio_rustls::{Accept, TlsAcceptor};
@ -8,18 +8,18 @@ pub use tokio_rustls::server::TlsStream;
pub use webpki_roots::TLS_SERVER_ROOTS;
use crate::codec::{AsyncRead, AsyncWrite};
use crate::rt::time::{sleep, Sleep};
use crate::service::{Service, ServiceFactory};
use crate::time::{sleep, Sleep};
use crate::util::counter::{Counter, CounterGuard};
use crate::util::Ready;
use super::{MAX_SSL_ACCEPT_COUNTER, ZERO};
use super::MAX_SSL_ACCEPT_COUNTER;
/// Support `SSL` connections via rustls package
///
/// `rust-tls` feature enables `RustlsAcceptor` type
pub struct Acceptor<T> {
timeout: time::Duration,
timeout: u64,
config: Arc<ServerConfig>,
io: marker::PhantomData<T>,
}
@ -29,7 +29,7 @@ impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
pub fn new(config: ServerConfig) -> Self {
Acceptor {
config: Arc::new(config),
timeout: time::Duration::from_secs(5),
timeout: 5000,
io: marker::PhantomData,
}
}
@ -38,7 +38,7 @@ impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
///
/// Default is set to 5 seconds.
pub fn timeout(mut self, time: u64) -> Self {
self.timeout = time::Duration::from_millis(time);
self.timeout = time;
self
}
}
@ -80,7 +80,7 @@ pub struct AcceptorService<T> {
acceptor: TlsAcceptor,
io: marker::PhantomData<T>,
conns: Counter,
timeout: time::Duration,
timeout: u64,
}
impl<T: AsyncRead + AsyncWrite + Unpin> Service for AcceptorService<T> {
@ -103,7 +103,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Service for AcceptorService<T> {
AcceptorServiceFut {
_guard: self.conns.get(),
fut: self.acceptor.accept(req),
delay: if self.timeout == ZERO {
delay: if self.timeout == 0 {
None
} else {
Some(sleep(self.timeout))
@ -112,28 +112,25 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Service for AcceptorService<T> {
}
}
pin_project_lite::pin_project! {
pub struct AcceptorServiceFut<T>
where
T: AsyncRead,
T: AsyncWrite,
T: Unpin,
{
fut: Accept<T>,
#[pin]
delay: Option<Sleep>,
_guard: CounterGuard,
}
pub struct AcceptorServiceFut<T>
where
T: AsyncRead,
T: AsyncWrite,
T: Unpin,
{
fut: Accept<T>,
delay: Option<Sleep>,
_guard: CounterGuard,
}
impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceFut<T> {
type Output = Result<TlsStream<T>, Box<dyn Error>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
if let Some(delay) = this.delay.as_pin_mut() {
match delay.poll(cx) {
if let Some(ref delay) = this.delay {
match delay.poll_elapsed(cx) {
Poll::Pending => (),
Poll::Ready(_) => {
return Poll::Ready(Err(Box::new(io::Error::new(

View file

@ -1,6 +1,6 @@
use std::{
future::Future, marker::PhantomData, net::SocketAddr, pin::Pin, task::Context,
task::Poll, time::Duration,
task::Poll,
};
use log::error;
@ -16,8 +16,8 @@ use super::Token;
pub(super) enum ServerMessage {
/// New stream
Connect(Stream),
/// Gracefull shutdown
Shutdown(Duration),
/// Gracefull shutdown in millis
Shutdown(u64),
/// Force shutdown
ForceShutdown,
}

View file

@ -1,13 +1,13 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin, sync::Arc, time};
use std::{future::Future, pin::Pin, sync::Arc};
use async_channel::{unbounded, Receiver, Sender};
use async_oneshot as oneshot;
use futures_core::Stream as FutStream;
use crate::rt::time::{sleep_until, Sleep};
use crate::rt::{spawn, Arbiter};
use crate::time::{sleep, Sleep};
use crate::util::{counter::Counter, join_all};
use super::accept::{AcceptNotify, Command};
@ -131,7 +131,7 @@ pub(super) struct Worker {
conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>,
state: WorkerState,
shutdown_timeout: time::Duration,
shutdown_timeout: u64,
}
struct WorkerService {
@ -162,7 +162,7 @@ impl Worker {
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
shutdown_timeout: time::Duration,
shutdown_timeout: u64,
) -> WorkerClient {
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
@ -192,7 +192,7 @@ impl Worker {
rx2: Receiver<StopCommand>,
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
shutdown_timeout: time::Duration,
shutdown_timeout: u64,
) -> Result<Worker, ()> {
availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
@ -320,11 +320,7 @@ enum WorkerState {
Token,
Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>,
),
Shutdown(
Pin<Box<Sleep>>,
Pin<Box<Sleep>>,
Option<oneshot::Sender<bool>>,
),
Shutdown(Sleep, Sleep, Option<oneshot::Sender<bool>>),
}
impl Future for Worker {
@ -349,12 +345,8 @@ impl Future for Worker {
if num != 0 {
info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown(
Box::pin(sleep_until(
time::Instant::now() + time::Duration::from_secs(1),
)),
Box::pin(sleep_until(
time::Instant::now() + self.shutdown_timeout,
)),
sleep(1000),
sleep(self.shutdown_timeout),
Some(result),
);
} else {
@ -428,7 +420,7 @@ impl Future for Worker {
}
// check graceful timeout
match t2.as_mut().poll(cx) {
match t2.poll_elapsed(cx) {
Poll::Pending => (),
Poll::Ready(_) => {
let _ = tx.take().unwrap().send(false);
@ -439,13 +431,11 @@ impl Future for Worker {
}
// sleep for 1 second and then check again
match t1.as_mut().poll(cx) {
match t1.poll_elapsed(cx) {
Poll::Pending => (),
Poll::Ready(_) => {
*t1 = Box::pin(sleep_until(
time::Instant::now() + time::Duration::from_secs(1),
));
let _ = t1.as_mut().poll(cx);
*t1 = sleep(1000);
let _ = t1.poll_elapsed(cx);
}
}
Poll::Pending
@ -606,7 +596,7 @@ mod tests {
"127.0.0.1:8080".parse().unwrap(),
)],
avail.clone(),
time::Duration::from_secs(5),
5_000,
)
.await
.unwrap();
@ -678,7 +668,7 @@ mod tests {
"127.0.0.1:8080".parse().unwrap(),
)],
avail.clone(),
time::Duration::from_secs(5),
5_000,
)
.await
.unwrap();

View file

@ -1,10 +1,10 @@
use std::cell::{Cell, RefCell};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::{cmp, fmt, io, mem, pin::Pin, time};
use std::{cmp, fmt, io, mem, pin::Pin};
use crate::codec::{AsyncRead, AsyncWrite, ReadBuf};
use crate::rt::time::sleep;
use crate::time::sleep;
use crate::util::{poll_fn, BytesMut};
#[derive(Default)]
@ -169,7 +169,7 @@ impl Io {
remote.read = IoState::Close;
remote.waker.wake();
}
sleep(time::Duration::from_millis(35)).await;
sleep(35).await;
}
/// Add extra data to the remote buffer and notify reader

143
ntex/src/time/mod.rs Normal file
View file

@ -0,0 +1,143 @@
//! Utilities for tracking time.
use std::{convert::TryInto, future::Future, pin::Pin, task, task::Poll, time};
mod wheel;
pub use self::wheel::TimerHandle;
/// Waits until `duration` has elapsed.
///
/// No work is performed while awaiting on the sleep future to complete. `Sleep`
/// operates at 16.5 millisecond granularity and should not be used for tasks that
/// require high-resolution timers.
#[inline]
pub fn sleep(millis: u64) -> Sleep {
Sleep::new(millis)
}
/// Waits until `duration` has elapsed.
///
/// No work is performed while awaiting on the sleep future to complete. `Sleep`
/// operates at 16.5 millisecond granularity and should not be used for tasks that
/// require high-resolution timers.
#[inline]
pub fn sleep_duration(duration: time::Duration) -> Sleep {
Sleep::new(duration.as_millis().try_into().unwrap_or_else(|_| {
log::error!("Duration is too large {:?}", duration);
1 << 31
}))
}
/// Require a `Future` to complete before the specified duration has elapsed.
///
/// If the future completes before the duration has elapsed, then the completed
/// value is returned. Otherwise, an error is returned and the future is
/// canceled.
pub fn timeout<T>(millis: u64, future: T) -> Timeout<T>
where
T: Future,
{
Timeout::new_with_delay(future, Sleep::new(millis))
}
/// Future returned by [`sleep`](sleep).
///
/// # Examples
///
/// Wait 100ms and print "100 ms have elapsed".
///
/// ```
/// use ntex::time::sleep;
///
/// #[ntex::main]
/// async fn main() {
/// sleep(100).await;
/// println!("100 ms have elapsed");
/// }
/// ```
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Sleep {
// The link between the `Sleep` instance and the timer that drives it.
hnd: TimerHandle,
}
impl Sleep {
/// Create new sleep future
#[inline]
pub fn new(millis: u64) -> Sleep {
Sleep {
hnd: TimerHandle::new(millis),
}
}
/// Returns `true` if `Sleep` has elapsed.
#[inline]
pub fn is_elapsed(&self) -> bool {
self.hnd.is_elapsed()
}
/// Resets the `Sleep` instance to a new deadline.
///
/// Calling this function allows changing the instant at which the `Sleep`
/// future completes without having to create new associated state.
///
/// This function can be called both before and after the future has
/// completed.
pub fn reset(&self, millis: u64) {
self.hnd.reset(millis);
}
#[inline]
pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
self.hnd.poll_elapsed(cx)
}
}
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
self.hnd.poll_elapsed(cx)
}
}
pin_project_lite::pin_project! {
/// Future returned by [`timeout`](timeout).
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct Timeout<T> {
#[pin]
value: T,
delay: Sleep,
}
}
impl<T> Timeout<T> {
pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout<T> {
Timeout { value, delay }
}
}
impl<T> Future for Timeout<T>
where
T: Future,
{
type Output = Result<T::Output, ()>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
// First, try polling the future
if let Poll::Ready(v) = this.value.poll(cx) {
return Poll::Ready(Ok(v));
}
// Now check the timer
match this.delay.poll_elapsed(cx) {
Poll::Ready(()) => Poll::Ready(Err(())),
Poll::Pending => Poll::Pending,
}
}
}

511
ntex/src/time/wheel.rs Normal file
View file

@ -0,0 +1,511 @@
//! Time wheel based timer service.
//!
//! Inspired by linux kernel timers system
#![allow(arithmetic_overflow)]
use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task, task::Poll, time};
use slab::Slab;
use crate::rt::time::{sleep_until, Sleep};
use crate::task::LocalWaker;
// Clock divisor for the next level
const LVL_CLK_SHIFT: u64 = 3;
const LVL_CLK_DIV: u64 = 1 << LVL_CLK_SHIFT;
const LVL_CLK_MASK: u64 = LVL_CLK_DIV - 1;
const fn lvl_shift(n: u64) -> u64 {
n * LVL_CLK_SHIFT
}
const fn lvl_gran(n: u64) -> u64 {
1 << lvl_shift(n)
}
// Resolution:
// 0: 1 millis
// 4: ~17 millis
//const UNITS: u64 = 4;
const UNITS: u64 = 0;
const fn to_units(n: u64) -> u64 {
n >> UNITS
}
const fn to_millis(n: u64) -> u64 {
n << UNITS
}
// The time start value for each level to select the bucket at enqueue time
const fn lvl_start(lvl: u64) -> u64 {
(LVL_SIZE - 1) << ((lvl - 1) * LVL_CLK_SHIFT)
}
// Size of each clock level
const LVL_BITS: u64 = 6;
const LVL_SIZE: u64 = 1 << LVL_BITS;
const LVL_MASK: u64 = LVL_SIZE - 1;
// Level depth
const LVL_DEPTH: u64 = 8;
const fn lvl_offs(n: u64) -> u64 {
n * LVL_SIZE
}
// The cutoff (max. capacity of the wheel)
const WHEEL_TIMEOUT_CUTOFF: u64 = lvl_start(LVL_DEPTH);
const WHEEL_TIMEOUT_MAX: u64 = WHEEL_TIMEOUT_CUTOFF - (lvl_gran(LVL_DEPTH - 1));
const WHEEL_SIZE: usize = (LVL_SIZE as usize) * (LVL_DEPTH as usize);
#[derive(Debug)]
pub struct TimerHandle(usize);
impl TimerHandle {
pub fn new(millis: u64) -> Self {
Timer::add_timer(millis)
}
/// Resets the `TimerHandle` instance to a new deadline.
pub fn reset(&self, millis: u64) {
Timer::update_timer(self.0, millis);
}
pub fn is_elapsed(&self) -> bool {
Timer::with_entry(self.0, |entry| {
entry.flags.contains(TimerEntryFlags::ELAPSED)
})
}
pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
Timer::with_entry(self.0, |entry| {
if entry.flags.contains(TimerEntryFlags::ELAPSED) {
Poll::Ready(())
} else {
entry.task.register(cx.waker());
Poll::Pending
}
})
}
}
impl Drop for TimerHandle {
fn drop(&mut self) {
Timer::remove_timer(self.0);
}
}
bitflags::bitflags! {
pub struct Flags: u8 {
const DRIVER_STARTED = 0b0000_0001;
const NEEDS_RECALC = 0b0000_0010;
const TIMER_ACTIVE = 0b0000_0100;
}
}
struct Timer(Rc<RefCell<TimerInner>>);
thread_local! {
static TIMER: Timer = Timer::new();
}
struct TimerInner {
timers: Slab<TimerEntry>,
elapsed: u64,
elapsed_instant: time::Instant,
next_expiry: u64,
flags: Flags,
driver: LocalWaker,
buckets: Vec<Bucket>,
/// Bit field tracking which bucket currently contain entries.
occupied: [u64; WHEEL_SIZE],
}
impl Timer {
fn new() -> Self {
Timer(Rc::new(RefCell::new(TimerInner::new())))
}
fn with_entry<F, R>(no: usize, f: F) -> R
where
F: Fn(&mut TimerEntry) -> R,
{
TIMER.with(|t| f(&mut t.0.borrow_mut().timers[no]))
}
// Add the timer into the hash bucket
fn add_timer(expires: u64) -> TimerHandle {
TIMER.with(|t| TimerInner::add_timer(&t.0, expires))
}
fn update_timer(handle: usize, expires: u64) {
TIMER.with(|t| TimerInner::update_timer(&t.0, handle, expires));
}
fn remove_timer(handle: usize) {
TIMER.with(|t| t.0.borrow_mut().remove_timer(handle));
}
}
impl TimerInner {
fn new() -> Self {
let mut buckets = Vec::with_capacity(WHEEL_SIZE);
for idx in 0..WHEEL_SIZE {
let lvl = idx / (LVL_SIZE as usize);
let offs = idx % (LVL_SIZE as usize);
buckets.push(Bucket::new(lvl, offs))
}
TimerInner {
buckets,
timers: Slab::default(),
elapsed: 0,
elapsed_instant: time::Instant::now(),
next_expiry: u64::MAX,
flags: Flags::empty(),
driver: LocalWaker::new(),
occupied: [0; WHEEL_SIZE],
}
}
// Add the timer into the hash bucket
fn add_timer(inner: &Rc<RefCell<Self>>, millis: u64) -> TimerHandle {
let mut slf = inner.borrow_mut();
let delta = to_units(
(time::Instant::now() + time::Duration::from_millis(millis)
- slf.elapsed_instant)
.as_millis() as u64,
);
let (no, bucket_expiry) = {
let slf = &mut *slf;
// crate timer entry
let (idx, bucket_expiry) = slf.calc_wheel_index(slf.elapsed + delta, delta);
let entry = slf.timers.vacant_entry();
let no = entry.key();
let bucket = &mut slf.buckets[idx];
let bucket_entry = bucket.add_entry(no);
entry.insert(TimerEntry {
no,
bucket_entry,
bucket: idx as u16,
task: LocalWaker::new(),
flags: TimerEntryFlags::empty(),
});
slf.occupied[bucket.lvl] |= bucket.bit;
(no, bucket_expiry)
};
// Check whether new bucket expire earlier
if bucket_expiry < slf.next_expiry {
slf.next_expiry = bucket_expiry;
if !slf.flags.contains(Flags::DRIVER_STARTED) {
slf.flags.insert(Flags::DRIVER_STARTED);
drop(slf);
TimerDriver::start(inner);
} else {
slf.flags.insert(Flags::NEEDS_RECALC);
slf.driver.wake();
}
}
TimerHandle(no)
}
fn update_timer(inner: &Rc<RefCell<Self>>, hnd: usize, millis: u64) {
let mut slf = inner.borrow_mut();
let delta = to_units(
(time::Instant::now() + time::Duration::from_millis(millis)
- slf.elapsed_instant)
.as_millis() as u64,
);
let bucket_expiry = {
let slf = &mut *slf;
// calc buckeet
let (idx, bucket_expiry) = slf.calc_wheel_index(slf.elapsed + delta, delta);
let entry = &mut slf.timers[hnd];
// do not do anything if bucket is the same
if idx == entry.bucket as usize {
return;
}
if !entry.flags.contains(TimerEntryFlags::ELAPSED) {
let b = &mut slf.buckets[entry.bucket as usize];
b.entries.remove(entry.bucket_entry);
if b.entries.is_empty() {
slf.occupied[b.lvl] &= b.bit_n;
}
}
let bucket = &mut slf.buckets[idx];
let bucket_entry = bucket.add_entry(entry.no);
entry.bucket = idx as u16;
entry.bucket_entry = bucket_entry;
entry.flags = TimerEntryFlags::empty();
slf.occupied[bucket.lvl] |= bucket.bit;
bucket_expiry
};
// Check whether new bucket expire earlier
if bucket_expiry < slf.next_expiry {
slf.next_expiry = bucket_expiry;
if !slf.flags.contains(Flags::DRIVER_STARTED) {
slf.flags.insert(Flags::DRIVER_STARTED);
drop(slf);
TimerDriver::start(inner);
} else {
slf.flags.insert(Flags::NEEDS_RECALC);
slf.driver.wake();
}
}
}
fn remove_timer(&mut self, handle: usize) {
let entry = self.timers.remove(handle);
if !entry.flags.contains(TimerEntryFlags::ELAPSED) {
let b = &mut self.buckets[entry.bucket as usize];
b.entries.remove(entry.bucket_entry);
if b.entries.is_empty() {
self.occupied[b.lvl] &= b.bit_n;
}
}
}
// Find next expiration bucket
fn next_pending_bucket(&mut self) -> Option<u64> {
let mut clk = self.elapsed;
let mut next = u64::MAX;
for lvl in 0..LVL_DEPTH {
let lvl_clk = clk & LVL_CLK_MASK;
let occupied = self.occupied[lvl as usize];
let pos = if occupied == 0 {
-1
} else {
let zeros = occupied
.rotate_right((clk & LVL_MASK) as u32)
.trailing_zeros() as usize;
zeros as isize
};
if pos >= 0 {
let tmp = (clk + pos as u64) << lvl_shift(lvl as u64);
if tmp < next {
next = tmp
}
// If the next expiration happens before we reach
// the next level, no need to check further.
if (pos as u64) <= ((LVL_CLK_DIV - lvl_clk) & LVL_CLK_MASK) {
break;
}
}
let adj = if lvl_clk == 0 { 0 } else { 1 };
clk >>= LVL_CLK_SHIFT;
clk += adj;
}
if next < u64::MAX {
Some(next)
} else {
None
}
}
// Get instant of the next expiry
fn next_expiry(&mut self) -> time::Instant {
let millis = to_millis(self.next_expiry - self.elapsed);
time::Instant::now() + time::Duration::from_millis(millis)
}
fn execute_expired_timers(&mut self, instant: time::Instant) {
let mut clk = self.next_expiry;
self.elapsed = self.next_expiry;
self.elapsed_instant = instant;
for lvl in 0..LVL_DEPTH {
let idx = (clk & LVL_MASK) + lvl * LVL_SIZE;
let b = &mut self.buckets[idx as usize];
if !b.entries.is_empty() {
self.occupied[b.lvl] &= b.bit_n;
for no in b.entries.drain() {
if let Some(timer) = self.timers.get_mut(no) {
timer.complete();
}
}
}
// Is it time to look at the next level?
if (clk & LVL_CLK_MASK) != 0 {
break;
}
// Shift clock for the next level granularity
clk >>= LVL_CLK_SHIFT;
}
}
fn calc_wheel_index(&self, expires: u64, delta: u64) -> (usize, u64) {
if delta < lvl_start(1) {
Self::calc_index(expires, 0)
} else if delta < lvl_start(2) {
Self::calc_index(expires, 1)
} else if delta < lvl_start(3) {
Self::calc_index(expires, 2)
} else if delta < lvl_start(4) {
Self::calc_index(expires, 3)
} else if delta < lvl_start(5) {
Self::calc_index(expires, 4)
} else if delta < lvl_start(6) {
Self::calc_index(expires, 5)
} else if delta < lvl_start(7) {
Self::calc_index(expires, 6)
} else if delta < lvl_start(8) {
Self::calc_index(expires, 7)
} else {
// Force expire obscene large timeouts to expire at the
// capacity limit of the wheel.
if delta >= WHEEL_TIMEOUT_CUTOFF {
Self::calc_index(self.elapsed + WHEEL_TIMEOUT_MAX, LVL_DEPTH - 1)
} else {
Self::calc_index(expires, LVL_DEPTH - 1)
}
}
}
// Helper function to calculate the bucket index and bucket expiration
fn calc_index(expires2: u64, lvl: u64) -> (usize, u64) {
/*
* The timer wheel has to guarantee that a timer does not fire
* early. Early expiry can happen due to:
* - Timer is armed at the edge of a tick
* - Truncation of the expiry time in the outer wheel levels
*
* Round up with level granularity to prevent this.
*/
let expires = (expires2 + lvl_gran(lvl)) >> lvl_shift(lvl);
(
(lvl_offs(lvl) + (expires & LVL_MASK)) as usize,
expires << lvl_shift(lvl),
)
}
}
#[derive(Debug)]
struct Bucket {
lvl: usize,
offs: u64,
bit: u64,
bit_n: u64,
entries: Slab<usize>,
}
impl Bucket {
fn add_entry(&mut self, no: usize) -> usize {
self.entries.insert(no)
}
}
impl Bucket {
fn new(lvl: usize, offs: usize) -> Self {
let bit = 1 << (offs as u64);
Bucket {
lvl,
bit,
offs: offs as u64,
bit_n: !bit,
entries: Slab::default(),
}
}
}
bitflags::bitflags! {
pub struct TimerEntryFlags: u8 {
const ELAPSED = 0b0000_0001;
}
}
#[derive(Debug)]
struct TimerEntry {
flags: TimerEntryFlags,
no: usize,
bucket: u16,
bucket_entry: usize,
task: LocalWaker,
}
impl TimerEntry {
fn complete(&mut self) {
if !self.flags.contains(TimerEntryFlags::ELAPSED) {
self.flags.insert(TimerEntryFlags::ELAPSED);
self.task.wake();
}
}
}
pin_project_lite::pin_project! {
struct TimerDriver {
inner: Rc<RefCell<TimerInner>>,
#[pin]
sleep: Sleep,
}
}
impl TimerDriver {
fn start(cell: &Rc<RefCell<TimerInner>>) {
let mut inner = cell.borrow_mut();
inner.flags.insert(Flags::TIMER_ACTIVE);
crate::rt::spawn(TimerDriver {
inner: cell.clone(),
sleep: sleep_until(inner.next_expiry()),
});
}
}
impl Future for TimerDriver {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
let mut inner = this.inner.borrow_mut();
inner.driver.register(cx.waker());
if inner.flags.contains(Flags::NEEDS_RECALC) {
inner.flags.remove(Flags::NEEDS_RECALC);
inner.flags.insert(Flags::TIMER_ACTIVE);
this.sleep.reset(inner.next_expiry());
drop(inner);
return self.poll(cx);
} else if inner.flags.contains(Flags::TIMER_ACTIVE)
&& this.sleep.poll(cx).is_ready()
{
drop(inner);
this = self.as_mut().project();
let mut inner = this.inner.borrow_mut();
let instant = this.sleep.deadline();
inner.execute_expired_timers(instant);
if let Some(next_expiry) = inner.next_pending_bucket() {
inner.next_expiry = next_expiry;
inner.flags.insert(Flags::TIMER_ACTIVE);
this.sleep.reset(inner.next_expiry());
drop(inner);
return self.poll(cx);
} else {
inner.next_expiry = u64::MAX;
inner.flags.remove(Flags::TIMER_ACTIVE);
}
}
Poll::Pending
}
}

View file

@ -129,7 +129,7 @@ mod tests {
}
fn call(&self, _: ()) -> Self::Future {
let fut = crate::rt::time::sleep(self.0);
let fut = crate::time::sleep_duration(self.0);
Box::pin(async move {
let _ = fut.await;
Ok::<_, ()>(())

View file

@ -1,8 +1,8 @@
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{cell::RefCell, convert::Infallible, future::Future, marker, pin::Pin};
use std::{cell::Cell, convert::Infallible, marker};
use crate::rt::time::{sleep_until, Sleep};
use crate::time::{sleep_duration, Sleep};
use crate::{util::Ready, Service, ServiceFactory};
use super::time::{LowResTime, LowResTimeService};
@ -72,31 +72,26 @@ where
pub struct KeepAliveService<R, E, F> {
f: F,
ka: Duration,
dur: Duration,
time: LowResTimeService,
inner: RefCell<Inner>,
sleep: Sleep,
expire: Cell<Instant>,
_t: marker::PhantomData<(R, E)>,
}
struct Inner {
delay: Pin<Box<Sleep>>,
expire: Instant,
}
impl<R, E, F> KeepAliveService<R, E, F>
where
F: Fn() -> E,
{
pub fn new(ka: Duration, time: LowResTimeService, f: F) -> Self {
let expire = time.now() + ka;
pub fn new(dur: Duration, time: LowResTimeService, f: F) -> Self {
let expire = Cell::new(time.now() + dur);
KeepAliveService {
f,
ka,
dur,
time,
inner: RefCell::new(Inner {
expire,
delay: Box::pin(sleep_until(expire)),
}),
expire,
sleep: sleep_duration(dur),
_t: marker::PhantomData,
}
}
@ -112,17 +107,15 @@ where
type Future = Ready<R, E>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.inner.borrow_mut();
match Pin::new(&mut inner.delay).poll(cx) {
match self.sleep.poll_elapsed(cx) {
Poll::Ready(_) => {
let now = self.time.now();
if inner.expire <= now {
if self.expire.get() <= now {
Poll::Ready(Err((self.f)()))
} else {
let expire = inner.expire;
inner.delay.as_mut().reset(expire);
let _ = Pin::new(&mut inner.delay).poll(cx);
let expire = self.expire.get() - Instant::now();
self.sleep.reset(expire.as_millis() as u64);
let _ = self.sleep.poll_elapsed(cx);
Poll::Ready(Ok(()))
}
}
@ -131,7 +124,7 @@ where
}
fn call(&self, req: R) -> Self::Future {
self.inner.borrow_mut().expire = self.time.now() + self.ka;
self.expire.set(self.time.now() + self.dur);
Ready::Ok(req)
}
}
@ -139,8 +132,8 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::rt::time::sleep;
use crate::service::{Service, ServiceFactory};
use crate::time::sleep;
use crate::util::lazy;
#[derive(Debug, PartialEq)]
@ -160,7 +153,7 @@ mod tests {
assert_eq!(service.call(1usize).await, Ok(1usize));
assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready());
sleep(Duration::from_millis(500)).await;
sleep(500).await;
assert_eq!(
lazy(|cx| service.poll_ready(cx)).await,
Poll::Ready(Err(TestErr))

View file

@ -161,11 +161,11 @@ where
#[cfg(test)]
mod tests {
use std::{cell::Cell, rc::Rc, time::Duration};
use std::{cell::Cell, rc::Rc};
use super::*;
use crate::util::{next, ByteString, BytesMut};
use crate::{channel::mpsc, codec::Encoder, rt::time::sleep, ws};
use crate::{channel::mpsc, codec::Encoder, time::sleep, ws};
#[crate::rt_test]
async fn test_basic() {
@ -200,7 +200,7 @@ mod tests {
assert_eq!(data, b"\x81\x04test".as_ref());
drop(tx);
sleep(Duration::from_millis(10)).await;
sleep(10).await;
assert!(next(&mut rx).await.is_none());
assert_eq!(counter.get(), 1);

View file

@ -1,9 +1,9 @@
use std::task::{Context, Poll};
use std::time::{self, Duration, Instant};
use std::{cell::RefCell, convert::Infallible, rc::Rc};
use std::{cell::RefCell, convert::Infallible, convert::TryInto, rc::Rc};
use crate::rt::time::sleep;
use crate::service::{Service, ServiceFactory};
use crate::time::sleep;
use crate::util::Ready;
#[derive(Clone, Debug)]
@ -11,12 +11,19 @@ pub struct LowResTime(Rc<RefCell<Inner>>);
#[derive(Debug)]
struct Inner {
resolution: Duration,
resolution: u64,
current: Option<Instant>,
}
impl Inner {
fn new(resolution: Duration) -> Self {
Inner::from_millis(resolution.as_millis().try_into().unwrap_or_else(|_| {
log::error!("Duration is too large {:?}", resolution);
1 << 31
}))
}
fn from_millis(resolution: u64) -> Self {
Inner {
resolution,
current: None,
@ -25,10 +32,16 @@ impl Inner {
}
impl LowResTime {
/// Create new timer service
pub fn with(resolution: Duration) -> LowResTime {
LowResTime(Rc::new(RefCell::new(Inner::new(resolution))))
}
/// Create new timer service
pub fn from_millis(resolution: u64) -> LowResTime {
LowResTime(Rc::new(RefCell::new(Inner::from_millis(resolution))))
}
pub fn timer(&self) -> LowResTimeService {
LowResTimeService(self.0.clone())
}
@ -36,7 +49,7 @@ impl LowResTime {
impl Default for LowResTime {
fn default() -> Self {
LowResTime(Rc::new(RefCell::new(Inner::new(Duration::from_secs(1)))))
LowResTime(Rc::new(RefCell::new(Inner::from_millis(1000))))
}
}
@ -109,12 +122,21 @@ pub struct SystemTime(Rc<RefCell<SystemTimeInner>>);
#[derive(Debug)]
struct SystemTimeInner {
resolution: Duration,
resolution: u64,
current: Option<time::SystemTime>,
}
impl SystemTimeInner {
fn new(resolution: Duration) -> Self {
SystemTimeInner::from_millis(resolution.as_millis().try_into().unwrap_or_else(
|_| {
log::error!("Duration is too large {:?}", resolution);
1 << 31
},
))
}
fn from_millis(resolution: u64) -> Self {
SystemTimeInner {
resolution,
current: None,
@ -126,10 +148,18 @@ impl SystemTimeInner {
pub struct SystemTimeService(Rc<RefCell<SystemTimeInner>>);
impl SystemTimeService {
/// Create new system time service
pub fn with(resolution: Duration) -> SystemTimeService {
SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(resolution))))
}
/// Create new system time service, set resolution in millis
pub fn from_millis(resolution: u64) -> SystemTimeService {
SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::from_millis(
resolution,
))))
}
/// Get current time. This function has to be called from
/// future's poll method, otherwise it panics.
pub fn now(&self) -> time::SystemTime {
@ -157,7 +187,7 @@ impl SystemTimeService {
#[cfg(test)]
mod tests {
use super::*;
use crate::util::lazy;
use crate::{time::sleep, util::lazy};
use std::time::{Duration, SystemTime};
#[crate::rt_test]
@ -196,7 +226,7 @@ mod tests {
#[crate::rt_test]
async fn system_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let wait_time = 300;
let time_service = SystemTimeService::with(resolution);
@ -212,7 +242,7 @@ mod tests {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
assert!(second_time - first_time >= wait_time);
assert!(second_time - first_time >= Duration::from_millis(wait_time));
}
/// State Under Test: `LowResTimeService::now()` updates returned value every resolution period.
@ -222,7 +252,7 @@ mod tests {
#[crate::rt_test]
async fn lowres_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let wait_time = 300;
let time_service = LowResTimeService::with(resolution);
let first_time = time_service.now();
@ -230,6 +260,6 @@ mod tests {
sleep(wait_time).await;
let second_time = time_service.now();
assert!(second_time - first_time >= wait_time);
assert!(second_time - first_time >= Duration::from_millis(wait_time));
}
}

View file

@ -2,20 +2,21 @@
//!
//! If the response does not complete within the specified timeout, the response
//! will be aborted.
use std::{fmt, future::Future, marker, pin::Pin, task::Context, task::Poll, time};
use std::{
convert::TryInto, fmt, future::Future, marker, pin::Pin, task::Context, task::Poll,
time,
};
use crate::rt::time::{sleep, Sleep};
use crate::service::{IntoService, Service, Transform};
use crate::time::Sleep;
use crate::util::Either;
const ZERO: time::Duration = time::Duration::from_millis(0);
/// Applies a timeout to requests.
///
/// Timeout transform is disabled if timeout is set to 0
#[derive(Debug)]
pub struct Timeout<E = ()> {
timeout: time::Duration,
timeout: u64,
_t: marker::PhantomData<E>,
}
@ -69,7 +70,7 @@ impl<E: PartialEq> PartialEq for TimeoutError<E> {
impl Timeout {
pub fn new(timeout: time::Duration) -> Self {
Timeout {
timeout,
timeout: timeout.as_millis() as u64,
_t: marker::PhantomData,
}
}
@ -77,7 +78,10 @@ impl Timeout {
impl Clone for Timeout {
fn clone(&self) -> Self {
Timeout::new(self.timeout)
Timeout {
timeout: self.timeout,
_t: marker::PhantomData,
}
}
}
@ -99,7 +103,7 @@ where
#[derive(Debug, Clone)]
pub struct TimeoutService<S> {
service: S,
timeout: time::Duration,
timeout: u64,
}
impl<S> TimeoutService<S>
@ -107,6 +111,16 @@ where
S: Service,
{
pub fn new<U>(timeout: time::Duration, service: U) -> Self
where
U: IntoService<S>,
{
TimeoutService {
timeout: timeout.as_millis().try_into().unwrap(),
service: service.into_service(),
}
}
pub fn from_millis<U>(timeout: u64, service: U) -> Self
where
U: IntoService<S>,
{
@ -137,14 +151,14 @@ where
}
fn call(&self, request: S::Request) -> Self::Future {
if self.timeout == ZERO {
if self.timeout == 0 {
Either::Right(TimeoutServiceResponse2 {
fut: self.service.call(request),
})
} else {
Either::Left(TimeoutServiceResponse {
fut: self.service.call(request),
sleep: Box::pin(sleep(self.timeout)),
sleep: Sleep::new(self.timeout),
})
}
}
@ -157,7 +171,7 @@ pin_project_lite::pin_project! {
pub struct TimeoutServiceResponse<T: Service> {
#[pin]
fut: T::Future,
sleep: Pin<Box<Sleep>>,
sleep: Sleep,
}
}
@ -245,7 +259,7 @@ mod tests {
}
fn call(&self, _: ()) -> Self::Future {
let fut = crate::rt::time::sleep(self.0);
let fut = crate::time::sleep_duration(self.0);
Box::pin(async move {
let _ = fut.await;
Ok::<_, SrvError>(())

View file

@ -848,6 +848,7 @@ mod tests {
assert_eq!(resp.status(), StatusCode::OK);
}
#[cfg(feature = "url")]
#[crate::rt_test]
async fn test_external_resource() {
let srv = init_service(

View file

@ -151,6 +151,7 @@ mod tests {
assert_eq!(resp.status(), StatusCode::OK);
}
#[cfg(feature = "url")]
#[crate::rt_test]
async fn test_configure_external_resource() {
let srv = init_service(

View file

@ -575,11 +575,9 @@ impl<Err: ErrorRenderer> ServiceFactory for ResourceEndpoint<Err> {
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::http::header::{self, HeaderValue};
use crate::http::{Method, StatusCode};
use crate::rt::time::sleep;
use crate::time::sleep;
use crate::web::middleware::DefaultHeaders;
use crate::web::request::WebRequest;
use crate::web::test::{call_service, init_service, TestRequest};
@ -661,7 +659,7 @@ mod tests {
async fn test_to() {
let srv =
init_service(App::new().service(web::resource("/test").to(|| async {
sleep(Duration::from_millis(100)).await;
sleep(100).await;
HttpResponse::Ok()
})))
.await;

View file

@ -274,10 +274,8 @@ array_routes!(12, a, b, c, d, e, f, g, h, i, j, k, l);
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::http::{Method, StatusCode};
use crate::rt::time::sleep;
use crate::time::sleep;
use crate::util::Bytes;
use crate::web::test::{call_service, init_service, read_body, TestRequest};
use crate::web::{self, error, App, DefaultError, HttpResponse};
@ -299,16 +297,16 @@ mod tests {
)
}),
web::post().to(|| async {
sleep(Duration::from_millis(100)).await;
sleep(100).await;
HttpResponse::Created()
}),
web::delete().to(|| async {
sleep(Duration::from_millis(100)).await;
sleep(100).await;
Err::<HttpResponse, _>(error::ErrorBadRequest("err"))
}),
]))
.service(web::resource("/json").route(web::get().to(|| async {
sleep(Duration::from_millis(25)).await;
sleep(25).await;
web::types::Json(MyObject {
name: "test".to_string(),
})

View file

@ -1303,6 +1303,7 @@ mod tests {
assert_eq!(resp.status(), StatusCode::OK);
}
#[cfg(feature = "url")]
#[crate::rt_test]
async fn test_url_for_external() {
let srv = init_service(App::new().service(web::scope("/app").configure(|s| {
@ -1330,6 +1331,7 @@ mod tests {
assert_eq!(body, &b"https://youtube.com/watch/xxxxxx"[..]);
}
#[cfg(feature = "url")]
#[crate::rt_test]
async fn test_url_for_nested() {
let srv = init_service(App::new().service(web::scope("/a").service(

View file

@ -18,11 +18,10 @@ use crate::http::header::{HeaderName, HeaderValue, CONTENT_TYPE};
use crate::http::test::TestRequest as HttpTestRequest;
use crate::http::{HttpService, Method, Payload, Request, StatusCode, Uri, Version};
use crate::router::{Path, ResourceDef};
use crate::rt::{time::sleep, System};
use crate::server::Server;
use crate::util::{next, Bytes, BytesMut, Extensions, Ready};
use crate::{
map_config, IntoService, IntoServiceFactory, Service, ServiceFactory, Stream,
map_config, rt::System, server::Server, time::sleep, IntoService,
IntoServiceFactory, Service, ServiceFactory, Stream,
};
use crate::web::config::AppConfig;
@ -735,8 +734,8 @@ where
Connector::default()
.lifetime(time::Duration::from_secs(0))
.keep_alive(time::Duration::from_millis(30000))
.timeout(time::Duration::from_millis(30000))
.disconnect_timeout(time::Duration::from_millis(3000))
.timeout(30_000)
.disconnect_timeout(3_000)
.openssl(builder.build())
.finish()
}
@ -744,14 +743,14 @@ where
{
Connector::default()
.lifetime(time::Duration::from_secs(0))
.timeout(time::Duration::from_millis(30000))
.timeout(30_000)
.finish()
}
};
Client::build()
.connector(connector)
.timeout(time::Duration::from_millis(30000))
.timeout(30_000)
.finish()
};
@ -949,7 +948,7 @@ impl TestServer {
pub async fn stop(self) {
self.server.stop(true).await;
self.system.stop();
sleep(time::Duration::from_millis(100)).await;
sleep(100).await;
}
}
@ -1223,7 +1222,7 @@ mod tests {
))
.finish(),
)
.timeout(time::Duration::from_millis(30000))
.timeout(30)
.finish();
let url = format!("https://localhost:{}/", srv.addr.port());

View file

@ -58,12 +58,7 @@ async fn test_simple() {
let bytes = response.body().await.unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
let mut response = srv
.post("/")
.timeout(Duration::from_secs(30))
.send()
.await
.unwrap();
let mut response = srv.post("/").timeout(30).send().await.unwrap();
assert!(response.status().is_success());
// read response
@ -168,7 +163,7 @@ async fn test_form() {
async fn test_timeout() {
let srv = test::server(|| {
App::new().service(web::resource("/").route(web::to(|| async {
ntex::rt::time::sleep(Duration::from_millis(200)).await;
ntex::time::sleep(2000).await;
HttpResponse::Ok().body(STR)
})))
});
@ -178,13 +173,10 @@ async fn test_timeout() {
ntex::connect::Connector::new()
.map(|sock| (sock, ntex::http::Protocol::Http1)),
)
.timeout(Duration::from_secs(15))
.timeout(15_000)
.finish();
let client = Client::build()
.connector(connector)
.timeout(Duration::from_millis(50))
.finish();
let client = Client::build().connector(connector).timeout(1).finish();
let request = client.get(srv.url("/")).send();
match request.await {
@ -197,18 +189,13 @@ async fn test_timeout() {
async fn test_timeout_override() {
let srv = test::server(|| {
App::new().service(web::resource("/").route(web::to(|| async {
ntex::rt::time::sleep(Duration::from_millis(200)).await;
ntex::time::sleep(2000).await;
HttpResponse::Ok().body(STR)
})))
});
let client = Client::build()
.timeout(Duration::from_millis(50000))
.finish();
let request = client
.get(srv.url("/"))
.timeout(Duration::from_millis(50))
.send();
let client = Client::build().timeout(50).finish();
let request = client.get(srv.url("/")).timeout(1).send();
match request.await {
Err(SendRequestError::Timeout) => (),
_ => panic!(),
@ -237,7 +224,7 @@ async fn test_connection_reuse() {
)
});
let client = Client::build().timeout(Duration::from_secs(10)).finish();
let client = Client::build().timeout(10).finish();
// req 1
let request = client.get(srv.url("/")).send();
@ -275,7 +262,7 @@ async fn test_connection_force_close() {
)
});
let client = Client::build().timeout(Duration::from_secs(10)).finish();
let client = Client::build().timeout(10).finish();
// req 1
let request = client.get(srv.url("/")).force_close().send();
@ -313,7 +300,7 @@ async fn test_connection_server_close() {
)
});
let client = Client::build().timeout(Duration::from_secs(10)).finish();
let client = Client::build().timeout(10).finish();
// req 1
let request = client.get(srv.url("/")).send();
@ -353,7 +340,7 @@ async fn test_connection_wait_queue() {
});
let client = Client::build()
.timeout(Duration::from_secs(30))
.timeout(30)
.connector(Connector::default().limit(1).finish())
.finish();
@ -401,7 +388,7 @@ async fn test_connection_wait_queue_force_close() {
});
let client = Client::build()
.timeout(Duration::from_secs(30))
.timeout(30)
.connector(Connector::default().limit(1).finish())
.finish();
@ -454,7 +441,7 @@ async fn test_no_decompress() {
})))
});
let client = Client::build().timeout(Duration::from_secs(30)).finish();
let client = Client::build().timeout(30).finish();
let mut res = client
.get(srv.url("/"))
@ -613,11 +600,7 @@ async fn test_client_brotli_encoding_large_random() {
assert_eq!(bytes, Bytes::from(data.clone()));
// frozen request
let request = srv
.post("/")
.timeout(Duration::from_secs(30))
.freeze()
.unwrap();
let request = srv.post("/").timeout(30).freeze().unwrap();
assert_eq!(request.get_method(), http::Method::POST);
assert_eq!(request.get_uri(), srv.url("/").as_str());
let mut response = request.send_body(data.clone()).await.unwrap();
@ -655,11 +638,7 @@ async fn test_client_brotli_encoding_large_random() {
assert_eq!(bytes, Bytes::from(data.clone()));
// frozen request
let request = srv
.post("/")
.timeout(Duration::from_secs(30))
.freeze()
.unwrap();
let request = srv.post("/").timeout(30).freeze().unwrap();
let mut response = request
.send_stream(once(ok::<_, JsonPayloadError>(Bytes::from(data.clone()))))
.await
@ -847,7 +826,7 @@ async fn client_read_until_eof() {
// client request
let req = Client::build()
.timeout(Duration::from_secs(30))
.timeout(30)
.finish()
.get(format!("http://{}/", addr).as_str());
let mut response = req.send().await.unwrap();

View file

@ -1,7 +1,6 @@
#![cfg(feature = "openssl")]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use futures::future::ok;
use open_ssl::ssl::{SslAcceptor, SslConnector, SslFiletype, SslMethod, SslVerifyMode};
@ -70,10 +69,7 @@ async fn test_connection_reuse_h2() {
.finish();
// req 1
let request = client
.get(srv.surl("/"))
.timeout(Duration::from_secs(10))
.send();
let request = client.get(srv.surl("/")).timeout(10).send();
let response = request.await.unwrap();
assert!(response.status().is_success());

View file

@ -1,7 +1,6 @@
#![cfg(feature = "rustls")]
use std::fs::File;
use std::io::{self, BufReader};
use std::time::Duration;
use futures::future::{self, err, ok};
use futures::stream::{once, Stream, StreamExt};
@ -151,14 +150,14 @@ async fn test_h2_content_length() {
for i in 0..1 {
let req = srv
.srequest(Method::GET, &format!("/{}", i))
.timeout(Duration::from_secs(30))
.timeout(30)
.send();
let response = req.await.unwrap();
assert_eq!(response.headers().get(&header), None);
let req = srv
.srequest(Method::HEAD, &format!("/{}", i))
.timeout(Duration::from_secs(100))
.timeout(100)
.send();
let response = req.await.unwrap();
assert_eq!(response.headers().get(&header), None);
@ -167,7 +166,7 @@ async fn test_h2_content_length() {
for i in 1..3 {
let req = srv
.srequest(Method::GET, &format!("/{}", i))
.timeout(Duration::from_secs(30))
.timeout(30)
.send();
let response = req.await.unwrap();
assert_eq!(response.headers().get(&header), Some(&value));

View file

@ -47,11 +47,7 @@ async fn test_run() {
use ntex::http::client;
let client = client::Client::build()
.connector(
client::Connector::default()
.timeout(Duration::from_millis(100))
.finish(),
)
.connector(client::Connector::default().timeout(100_000).finish())
.finish();
let host = format!("http://{}", addr);
@ -90,10 +86,10 @@ fn client() -> ntex::http::client::Client {
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
ntex::http::client::Client::build()
.timeout(Duration::from_millis(30000))
.timeout(30)
.connector(
ntex::http::client::Connector::default()
.timeout(Duration::from_millis(30000))
.timeout(30_000)
.openssl(builder.build())
.finish(),
)

View file

@ -1,7 +1,6 @@
use std::io::{self, Read, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use brotli2::write::{BrotliDecoder, BrotliEncoder};
use derive_more::Display;
@ -883,7 +882,7 @@ async fn test_reading_deflate_encoding_large_random_rustls() {
// client request
let req = srv
.post("/")
.timeout(Duration::from_millis(10000))
.timeout(10)
.header(CONTENT_ENCODING, "deflate")
.send_stream(TestBody::new(Bytes::from(enc), 1024));
@ -934,7 +933,7 @@ async fn test_reading_deflate_encoding_large_random_rustls_h1() {
// client request
let req = srv
.post("/")
.timeout(Duration::from_millis(10000))
.timeout(10)
.header(CONTENT_ENCODING, "deflate")
.send_stream(TestBody::new(Bytes::from(enc), 1024));
@ -985,7 +984,7 @@ async fn test_reading_deflate_encoding_large_random_rustls_h2() {
// client request
let req = srv
.post("/")
.timeout(Duration::from_millis(10000))
.timeout(10)
.header(CONTENT_ENCODING, "deflate")
.send_stream(TestBody::new(Bytes::from(enc), 1024));