rename Duration to Millis

This commit is contained in:
Nikolay Kim 2021-08-28 20:24:24 +06:00
parent 3c056c49b8
commit 86894230d5
29 changed files with 321 additions and 184 deletions

View file

@ -7,8 +7,12 @@ mod builder;
mod runtime;
mod system;
mod time;
#[doc(hidden)]
pub mod time;
pub mod time_driver {
pub use super::time::*;
}
pub use self::arbiter::Arbiter;
pub use self::builder::{Builder, SystemRunner};

View file

@ -4,7 +4,9 @@
* Add timer service
* Add helper time types Duration, Seconds
* Add helper time types Millis and Seconds
* Add sleep, interval, timeout helpers
* Use ntex-rt 0.3

View file

@ -1,13 +1,14 @@
//! Framed transport dispatcher
use std::{
cell::Cell, cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context,
task::Poll, time::Duration, time::Instant,
task::Poll, time, time::Instant,
};
use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use crate::framed::{DispatchItem, Read, ReadTask, State, Timer, Write, WriteTask};
use crate::service::{IntoService, Service};
use crate::{time::Seconds, util::Either};
use crate::time::Seconds;
use crate::util::Either;
type Response<U> = <U as Encoder>::Item;
@ -38,7 +39,7 @@ where
st: Cell<DispatcherState>,
state: State,
timer: Timer,
ka_timeout: u16,
ka_timeout: Seconds,
ka_updated: Cell<Instant>,
error: Cell<Option<S::Error>>,
shared: Rc<DispatcherShared<S, U>>,
@ -117,10 +118,10 @@ where
timer: Timer,
) -> Self {
let updated = timer.now();
let ka_timeout: u16 = 30;
let ka_timeout = Seconds(30);
// register keepalive timer
let expire = updated + Duration::from_secs(ka_timeout as u64);
let expire = updated + time::Duration::from(ka_timeout);
timer.register(expire, expire, &state);
Dispatcher {
@ -142,19 +143,18 @@ where
}
}
/// Set keep-alive timeout in seconds.
/// Set keep-alive timeout.
///
/// To disable timeout set value to 0.
///
/// By default keep-alive timeout is set to 30 seconds.
pub fn keepalive_timeout(mut self, timeout: u16) -> Self {
pub fn keepalive_timeout(mut self, timeout: Seconds) -> Self {
// register keepalive timer
let prev = self.inner.ka_updated.get() + self.inner.ka();
if timeout == 0 {
let prev = self.inner.ka_updated.get() + time::Duration::from(self.inner.ka());
if timeout.is_zero() {
self.inner.timer.unregister(prev, &self.inner.state);
} else {
let expire =
self.inner.ka_updated.get() + Duration::from_secs(timeout as u64);
let expire = self.inner.ka_updated.get() + time::Duration::from(timeout);
self.inner.timer.register(expire, prev, &self.inner.state);
}
self.inner.ka_timeout = timeout;
@ -458,12 +458,12 @@ where
}
}
fn ka(&self) -> Duration {
Duration::from_secs(self.ka_timeout as u64)
fn ka(&self) -> Seconds {
self.ka_timeout
}
fn ka_enabled(&self) -> bool {
self.ka_timeout > 0
self.ka_timeout.non_zero()
}
/// check keepalive timeout
@ -483,7 +483,7 @@ where
if self.ka_enabled() {
let updated = self.timer.now();
if updated != self.ka_updated.get() {
let ka = self.ka();
let ka = time::Duration::from(self.ka());
self.timer.register(
updated + ka,
self.ka_updated.get() + ka,
@ -497,8 +497,10 @@ where
/// unregister keep-alive timer
fn unregister_keepalive(&self) {
if self.ka_enabled() {
self.timer
.unregister(self.ka_updated.get() + self.ka(), &self.state);
self.timer.unregister(
self.ka_updated.get() + time::Duration::from(self.ka()),
&self.state,
);
}
}
}
@ -507,6 +509,7 @@ where
mod tests {
use rand::Rng;
use std::sync::{atomic::AtomicBool, atomic::Ordering::Relaxed, Arc, Mutex};
use std::time::Duration;
use crate::codec::BytesCodec;
use crate::testing::Io;
@ -533,7 +536,7 @@ mod tests {
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
let timer = Timer::default();
let ka_timeout = 1;
let ka_timeout = Seconds(1);
let ka_updated = timer.now();
let state = State::new();
let io = Rc::new(RefCell::new(io));
@ -769,7 +772,10 @@ mod tests {
}),
);
crate::rt::spawn(async move {
let _ = disp.keepalive_timeout(0).keepalive_timeout(1).await;
let _ = disp
.keepalive_timeout(Seconds::ZERO)
.keepalive_timeout(Seconds(1))
.await;
});
state.set_disconnect_timeout(Seconds(1));

View file

@ -1,19 +1,19 @@
use std::{cell::RefCell, collections::BTreeMap, rc::Rc, time::Instant};
use crate::framed::State;
use crate::time::{sleep, Duration};
use crate::time::{sleep, Millis};
use crate::util::HashSet;
pub struct Timer(Rc<RefCell<Inner>>);
struct Inner {
resolution: Duration,
resolution: Millis,
current: Option<Instant>,
notifications: BTreeMap<Instant, HashSet<State>>,
}
impl Inner {
fn new(resolution: Duration) -> Self {
fn new(resolution: Millis) -> Self {
Inner {
resolution,
current: None,
@ -39,13 +39,13 @@ impl Clone for Timer {
impl Default for Timer {
fn default() -> Self {
Timer::new(Duration::from_millis(1_000))
Timer::new(Millis::ONE_SEC)
}
}
impl Timer {
/// Create new timer with resolution in milliseconds
pub fn new(resolution: Duration) -> Timer {
pub fn new(resolution: Millis) -> Timer {
Timer(Rc::new(RefCell::new(Inner::new(resolution))))
}

View file

@ -11,7 +11,7 @@ use crate::http::request::Request;
use crate::http::response::Response;
use crate::http::service::HttpService;
use crate::service::{boxed, IntoService, IntoServiceFactory, Service, ServiceFactory};
use crate::time::{Duration, Seconds};
use crate::time::{Millis, Seconds};
/// A http service builder
///
@ -19,9 +19,9 @@ use crate::time::{Duration, Seconds};
/// builder-like pattern.
pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler<T>> {
keep_alive: KeepAlive,
client_timeout: Duration,
client_timeout: Millis,
client_disconnect: Seconds,
handshake_timeout: Duration,
handshake_timeout: Millis,
lw: u16,
read_hw: u16,
write_hw: u16,
@ -37,9 +37,9 @@ impl<T, S> HttpServiceBuilder<T, S, ExpectHandler, UpgradeHandler<T>> {
pub fn new() -> Self {
HttpServiceBuilder {
keep_alive: KeepAlive::Timeout(Seconds(5)),
client_timeout: Duration::from_secs(3),
client_timeout: Millis::from_secs(3),
client_disconnect: Seconds(3),
handshake_timeout: Duration::from_secs(5),
handshake_timeout: Millis::from_secs(5),
lw: 1024,
read_hw: 8 * 1024,
write_hw: 8 * 1024,

View file

@ -2,7 +2,7 @@ use std::{convert::TryFrom, fmt, rc::Rc};
use crate::http::error::HttpError;
use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue};
use crate::{time, Service};
use crate::{time::Millis, Service};
use super::connect::ConnectorWrapper;
use super::error::ConnectError;
@ -33,7 +33,7 @@ impl ClientBuilder {
max_redirects: 10,
config: ClientConfig {
headers: HeaderMap::new(),
timeout: time::Duration::from_millis(5_000),
timeout: Millis(5_000),
connector: Box::new(ConnectorWrapper(Connector::default().finish())),
},
}
@ -55,14 +55,14 @@ impl ClientBuilder {
///
/// Request timeout is the total time before a response must be received.
/// Default value is 5 seconds.
pub fn timeout<T: Into<time::Duration>>(mut self, timeout: T) -> Self {
pub fn timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
self.config.timeout = timeout.into();
self
}
/// Disable request timeout.
pub fn disable_timeout(mut self) -> Self {
self.config.timeout = time::Duration::ZERO;
self.config.timeout = Millis::ZERO;
self
}

View file

@ -4,7 +4,7 @@ use crate::codec::{AsyncRead, AsyncWrite};
use crate::connect::{Connect as TcpConnect, Connector as TcpConnector};
use crate::http::{Protocol, Uri};
use crate::service::{apply_fn, boxed, Service};
use crate::time;
use crate::time::{Millis, Seconds};
use crate::util::timeout::{TimeoutError, TimeoutService};
use crate::util::{Either, Ready};
@ -30,7 +30,6 @@ type BoxedConnector =
/// construction that finishes by calling the `.finish()` method.
///
/// ```rust,no_run
/// use std::time::Duration;
/// use ntex::http::client::Connector;
///
/// let connector = Connector::default()
@ -38,10 +37,10 @@ type BoxedConnector =
/// .finish();
/// ```
pub struct Connector {
timeout: time::Duration,
timeout: Millis,
conn_lifetime: Duration,
conn_keep_alive: Duration,
disconnect_timeout: time::Duration,
disconnect_timeout: Millis,
limit: usize,
connector: BoxedConnector,
ssl_connector: Option<BoxedConnector>,
@ -65,10 +64,10 @@ impl Connector {
.map_err(ConnectError::from),
),
ssl_connector: None,
timeout: time::Duration::from_millis(1_000),
timeout: Millis(1_000),
conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15),
disconnect_timeout: time::Duration::from_millis(3_000),
disconnect_timeout: Millis(3_000),
limit: 100,
};
@ -104,7 +103,7 @@ impl Connector {
///
/// i.e. max time to connect to remote host including dns name resolution.
/// Set to 1 second by default.
pub fn timeout<T: Into<time::Duration>>(mut self, timeout: T) -> Self {
pub fn timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
self.timeout = timeout.into();
self
}
@ -165,8 +164,8 @@ impl Connector {
/// the delay between repeated usages of the same connection
/// exceeds this period, the connection is closed.
/// Default keep-alive period is 15 seconds.
pub fn keep_alive(mut self, dur: Duration) -> Self {
self.conn_keep_alive = dur;
pub fn keep_alive(mut self, dur: Seconds) -> Self {
self.conn_keep_alive = dur.into();
self
}
@ -175,8 +174,8 @@ impl Connector {
/// Connection lifetime is max lifetime of any opened connection
/// until it is closed regardless of keep-alive period.
/// Default lifetime period is 75 seconds.
pub fn lifetime(mut self, dur: Duration) -> Self {
self.conn_lifetime = dur;
pub fn lifetime(mut self, dur: Seconds) -> Self {
self.conn_lifetime = dur.into();
self
}
@ -188,7 +187,7 @@ impl Connector {
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 3 seconds.
pub fn disconnect_timeout<T: Into<time::Duration>>(mut self, timeout: T) -> Self {
pub fn disconnect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
self.disconnect_timeout = timeout.into();
self
}
@ -266,7 +265,7 @@ impl Connector {
fn connector(
connector: BoxedConnector,
timeout: time::Duration,
timeout: Millis,
) -> impl Service<
Request = Connect,
Response = (Box<dyn Io>, Protocol),

View file

@ -4,7 +4,7 @@ use crate::http::body::Body;
use crate::http::error::HttpError;
use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue};
use crate::http::{Method, RequestHead, RequestHeadType, Uri};
use crate::{time, util::Bytes, Stream};
use crate::{time::Millis, util::Bytes, Stream};
use super::sender::SendClientRequest;
use super::ClientConfig;
@ -16,7 +16,7 @@ pub struct FrozenClientRequest {
pub(super) head: Rc<RequestHead>,
pub(super) addr: Option<net::SocketAddr>,
pub(super) response_decompress: bool,
pub(super) timeout: time::Duration,
pub(super) timeout: Millis,
pub(super) config: Rc<ClientConfig>,
}

View file

@ -45,7 +45,7 @@ pub use self::test::TestResponse;
use crate::http::error::HttpError;
use crate::http::{HeaderMap, Method, RequestHead, Uri};
use crate::time;
use crate::time::Millis;
use self::connect::{Connect as InnerConnect, ConnectorWrapper};
@ -78,7 +78,7 @@ pub struct Client(Rc<ClientConfig>);
pub(self) struct ClientConfig {
pub(self) connector: Box<dyn InnerConnect>,
pub(self) headers: HeaderMap,
pub(self) timeout: time::Duration,
pub(self) timeout: Millis,
}
impl Default for Client {
@ -86,7 +86,7 @@ impl Default for Client {
Client(Rc::new(ClientConfig {
connector: Box::new(ConnectorWrapper(Connector::default().finish())),
headers: HeaderMap::new(),
timeout: time::Duration::from_millis(5_000),
timeout: Millis(5_000),
}))
}
}

View file

@ -11,7 +11,7 @@ use crate::http::Protocol;
use crate::rt::spawn;
use crate::service::Service;
use crate::task::LocalWaker;
use crate::time::{self as time, sleep, Sleep};
use crate::time::{sleep, Millis, Sleep};
use crate::util::{poll_fn, Bytes, HashMap};
use super::connection::{ConnectionType, IoConnection};
@ -47,7 +47,7 @@ where
connector: T,
conn_lifetime: Duration,
conn_keep_alive: Duration,
disconnect_timeout: time::Duration,
disconnect_timeout: Millis,
limit: usize,
) -> Self {
let connector = Rc::new(connector);
@ -180,7 +180,7 @@ struct AvailableConnection<Io> {
pub(super) struct Inner<Io> {
conn_lifetime: Duration,
conn_keep_alive: Duration,
disconnect_timeout: time::Duration,
disconnect_timeout: Millis,
limit: usize,
acquired: usize,
available: HashMap<Key, VecDeque<AvailableConnection<Io>>>,
@ -373,7 +373,7 @@ impl<T> CloseConnection<T>
where
T: AsyncWrite + AsyncRead + Unpin + 'static,
{
fn spawn(io: T, timeout: time::Duration) {
fn spawn(io: T, timeout: Millis) {
spawn(Self {
io,
shutdown: false,
@ -603,10 +603,9 @@ mod tests {
use std::{cell::RefCell, convert::TryFrom, rc::Rc};
use super::*;
use crate::time::{self as time, sleep};
use crate::{
http::client::Connection, http::Uri, service::fn_service, testing::Io,
util::lazy,
time::sleep, util::lazy,
};
#[crate::rt_test]
@ -622,7 +621,7 @@ mod tests {
}),
Duration::from_secs(10),
Duration::from_secs(10),
time::Duration::ZERO,
Millis::ZERO,
1,
)
.clone();

View file

@ -10,7 +10,7 @@ use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue};
use crate::http::{
uri, ConnectionType, Method, RequestHead, RequestHeadType, Uri, Version,
};
use crate::{time, util::Bytes, Stream};
use crate::{time::Millis, util::Bytes, Stream};
use super::error::{FreezeRequestError, InvalidUrl};
use super::frozen::FrozenClientRequest;
@ -51,7 +51,7 @@ pub struct ClientRequest {
#[cfg(feature = "cookie")]
cookies: Option<CookieJar>,
response_decompress: bool,
timeout: time::Duration,
timeout: Millis,
config: Rc<ClientConfig>,
}
@ -69,7 +69,7 @@ impl ClientRequest {
addr: None,
#[cfg(feature = "cookie")]
cookies: None,
timeout: time::Duration::ZERO,
timeout: Millis::ZERO,
response_decompress: true,
}
.method(method)
@ -313,7 +313,7 @@ impl ClientRequest {
///
/// Request timeout is the total time before a response must be received.
/// Default value is 5 seconds.
pub fn timeout<T: Into<time::Duration>>(mut self, timeout: T) -> Self {
pub fn timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
self.timeout = timeout.into();
self
}

View file

@ -7,8 +7,8 @@ use crate::http::body::{Body, BodyStream};
use crate::http::error::HttpError;
use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue};
use crate::http::RequestHeadType;
use crate::time::{sleep, Sleep};
use crate::{time, util::Bytes, Stream};
use crate::time::{sleep, Millis, Sleep};
use crate::{util::Bytes, Stream};
#[cfg(feature = "compress")]
use crate::http::encoding::Decoder;
@ -58,7 +58,7 @@ impl SendClientRequest {
pub(crate) fn new(
send: Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>,
response_decompress: bool,
timeout: time::Duration,
timeout: Millis,
) -> SendClientRequest {
SendClientRequest::Fut(send, timeout.map(sleep), response_decompress)
}
@ -129,7 +129,7 @@ impl RequestHeadType {
self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
mut timeout: time::Duration,
mut timeout: Millis,
config: &ClientConfig,
body: B,
) -> SendClientRequest
@ -151,7 +151,7 @@ impl RequestHeadType {
mut self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: time::Duration,
timeout: Millis,
config: &ClientConfig,
value: &T,
) -> SendClientRequest {
@ -178,7 +178,7 @@ impl RequestHeadType {
mut self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: time::Duration,
timeout: Millis,
config: &ClientConfig,
value: &T,
) -> SendClientRequest {
@ -208,7 +208,7 @@ impl RequestHeadType {
self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: time::Duration,
timeout: Millis,
config: &ClientConfig,
stream: S,
) -> SendClientRequest
@ -229,7 +229,7 @@ impl RequestHeadType {
self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: time::Duration,
timeout: Millis,
config: &ClientConfig,
) -> SendClientRequest {
self.send_body(addr, response_decompress, timeout, config, Body::Empty)

View file

@ -3,7 +3,7 @@ use std::{cell::Cell, cell::RefCell, ptr::copy_nonoverlapping, rc::Rc, time};
use crate::framed::Timer;
use crate::http::{Request, Response};
use crate::service::boxed::BoxService;
use crate::time::{sleep, Duration, Seconds, Sleep};
use crate::time::{sleep, Millis, Seconds, Sleep};
use crate::util::BytesMut;
#[derive(Debug, PartialEq, Clone, Copy)]
@ -37,12 +37,12 @@ impl From<Option<usize>> for KeepAlive {
pub struct ServiceConfig(pub(super) Rc<Inner>);
pub(super) struct Inner {
pub(super) keep_alive: Duration,
pub(super) client_timeout: Duration,
pub(super) keep_alive: Millis,
pub(super) client_timeout: Millis,
pub(super) client_disconnect: Seconds,
pub(super) ka_enabled: bool,
pub(super) timer: DateService,
pub(super) ssl_handshake_timeout: Duration,
pub(super) ssl_handshake_timeout: Millis,
pub(super) timer_h1: Timer,
pub(super) lw: u16,
pub(super) read_hw: u16,
@ -59,9 +59,9 @@ impl Default for ServiceConfig {
fn default() -> Self {
Self::new(
KeepAlive::Timeout(Seconds(5)),
Duration::ZERO,
Millis::ZERO,
Seconds::ZERO,
Duration::from_millis(5_000),
Millis(5_000),
1024,
8 * 1024,
8 * 1024,
@ -73,23 +73,19 @@ impl ServiceConfig {
/// Create instance of `ServiceConfig`
pub fn new(
keep_alive: KeepAlive,
client_timeout: Duration,
client_timeout: Millis,
client_disconnect: Seconds,
ssl_handshake_timeout: Duration,
ssl_handshake_timeout: Millis,
lw: u16,
read_hw: u16,
write_hw: u16,
) -> ServiceConfig {
let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (Duration::from(val), true),
KeepAlive::Os => (Duration::ZERO, true),
KeepAlive::Disabled => (Duration::ZERO, false),
};
let keep_alive = if ka_enabled {
keep_alive
} else {
Duration::ZERO
KeepAlive::Timeout(val) => (Millis::from(val), true),
KeepAlive::Os => (Millis::ZERO, true),
KeepAlive::Disabled => (Millis::ZERO, false),
};
let keep_alive = if ka_enabled { keep_alive } else { Millis::ZERO };
ServiceConfig(Rc::new(Inner {
keep_alive,
@ -112,8 +108,8 @@ pub(super) struct DispatcherConfig<T, S, X, U> {
pub(super) service: S,
pub(super) expect: X,
pub(super) upgrade: Option<U>,
pub(super) keep_alive: Duration,
pub(super) client_timeout: Duration,
pub(super) keep_alive: Millis,
pub(super) client_timeout: Millis,
pub(super) client_disconnect: Seconds,
pub(super) ka_enabled: bool,
pub(super) timer: DateService,

View file

@ -13,7 +13,7 @@ use crate::http::request::Request;
use crate::http::response::Response;
use crate::rt::net::TcpStream;
use crate::{
pipeline_factory, time::Duration, IntoServiceFactory, Service, ServiceFactory,
pipeline_factory, time::Millis, IntoServiceFactory, Service, ServiceFactory,
};
use super::codec::Codec;
@ -29,7 +29,7 @@ pub struct H1Service<T, S, B, X = ExpectHandler, U = UpgradeHandler<T>> {
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
on_request: RefCell<Option<OnRequest<T>>>,
#[allow(dead_code)]
handshake_timeout: Duration,
handshake_timeout: Millis,
_t: marker::PhantomData<(T, B)>,
}

View file

@ -12,7 +12,7 @@ use crate::http::helpers::DataFactory;
use crate::http::request::Request;
use crate::http::response::Response;
use crate::rt::net::TcpStream;
use crate::time::Duration;
use crate::time::Millis;
use crate::util::Bytes;
use crate::{
fn_factory, fn_service, pipeline_factory, IntoServiceFactory, Service,
@ -27,7 +27,7 @@ pub struct H2Service<T, S, B> {
cfg: ServiceConfig,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
#[allow(dead_code)]
handshake_timeout: Duration,
handshake_timeout: Millis,
_t: PhantomData<(T, B)>,
}

View file

@ -9,7 +9,7 @@ use crate::codec::{AsyncRead, AsyncWrite};
use crate::framed::State;
use crate::rt::net::TcpStream;
use crate::service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use crate::time::{Duration, Seconds};
use crate::time::{Millis, Seconds};
use crate::util::Bytes;
use super::body::MessageBody;
@ -62,9 +62,9 @@ where
pub fn new<F: IntoServiceFactory<S>>(service: F) -> Self {
let cfg = ServiceConfig::new(
KeepAlive::Timeout(Seconds(5)),
Duration::from_millis(5_000),
Millis(5_000),
Seconds::ZERO,
Duration::from_millis(5_000),
Millis(5_000),
1024,
8 * 1024,
8 * 1024,

View file

@ -7,7 +7,7 @@ use log::{error, info};
use socket2::{Domain, SockAddr, Socket, Type};
use crate::rt::{net::TcpStream, spawn, System};
use crate::{time::sleep, time::Duration, util::join_all};
use crate::{time::sleep, time::Millis, util::join_all};
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::config::{ConfiguredService, ServiceConfig};
@ -29,7 +29,7 @@ pub struct ServerBuilder {
sockets: Vec<(Token, String, Listener)>,
accept: AcceptLoop,
exit: bool,
shutdown_timeout: Duration,
shutdown_timeout: Millis,
no_signals: bool,
cmd: Receiver<ServerCommand>,
server: Server,
@ -57,7 +57,7 @@ impl ServerBuilder {
accept: AcceptLoop::new(server.clone()),
backlog: 2048,
exit: false,
shutdown_timeout: Duration::from_secs(30),
shutdown_timeout: Millis::from_secs(30),
no_signals: false,
cmd: rx,
notify: Vec::new(),
@ -123,7 +123,7 @@ impl ServerBuilder {
/// dropped.
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout<T: Into<Duration>>(mut self, timeout: T) -> Self {
pub fn shutdown_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
self.shutdown_timeout = timeout.into();
self
}

View file

@ -6,7 +6,7 @@ pub use tokio_openssl::SslStream;
use crate::codec::{AsyncRead, AsyncWrite};
use crate::service::{Service, ServiceFactory};
use crate::time::{sleep, Duration, Sleep};
use crate::time::{sleep, Millis, Sleep};
use crate::util::{counter::Counter, counter::CounterGuard, Ready};
use super::MAX_SSL_ACCEPT_COUNTER;
@ -16,7 +16,7 @@ use super::MAX_SSL_ACCEPT_COUNTER;
/// `openssl` feature enables `Acceptor` type
pub struct Acceptor<T: AsyncRead + AsyncWrite> {
acceptor: SslAcceptor,
timeout: Duration,
timeout: Millis,
io: PhantomData<T>,
}
@ -25,7 +25,7 @@ impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
pub fn new(acceptor: SslAcceptor) -> Self {
Acceptor {
acceptor,
timeout: Duration::from_millis(5_000),
timeout: Millis(5_000),
io: PhantomData,
}
}
@ -33,7 +33,7 @@ impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
/// Set handshake timeout.
///
/// Default is set to 5 seconds.
pub fn timeout<U: Into<Duration>>(mut self, timeout: U) -> Self {
pub fn timeout<U: Into<Millis>>(mut self, timeout: U) -> Self {
self.timeout = timeout.into();
self
}
@ -76,7 +76,7 @@ where
pub struct AcceptorService<T> {
acceptor: SslAcceptor,
conns: Counter,
timeout: Duration,
timeout: Millis,
io: PhantomData<T>,
}

View file

@ -9,7 +9,7 @@ pub use webpki_roots::TLS_SERVER_ROOTS;
use crate::codec::{AsyncRead, AsyncWrite};
use crate::service::{Service, ServiceFactory};
use crate::time::{sleep, Duration, Sleep};
use crate::time::{sleep, Millis, Sleep};
use crate::util::counter::{Counter, CounterGuard};
use crate::util::Ready;
@ -19,7 +19,7 @@ use super::MAX_SSL_ACCEPT_COUNTER;
///
/// `rust-tls` feature enables `RustlsAcceptor` type
pub struct Acceptor<T> {
timeout: Duration,
timeout: Millis,
config: Arc<ServerConfig>,
io: PhantomData<T>,
}
@ -29,7 +29,7 @@ impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
pub fn new(config: ServerConfig) -> Self {
Acceptor {
config: Arc::new(config),
timeout: Duration::from_millis(5_000),
timeout: Millis(5_000),
io: PhantomData,
}
}
@ -37,7 +37,7 @@ impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
/// Set handshake timeout.
///
/// Default is set to 5 seconds.
pub fn timeout<U: Into<Duration>>(mut self, timeout: U) -> Self {
pub fn timeout<U: Into<Millis>>(mut self, timeout: U) -> Self {
self.timeout = timeout.into();
self
}
@ -80,7 +80,7 @@ pub struct AcceptorService<T> {
acceptor: TlsAcceptor,
io: PhantomData<T>,
conns: Counter,
timeout: Duration,
timeout: Millis,
}
impl<T: AsyncRead + AsyncWrite + Unpin> Service for AcceptorService<T> {

View file

@ -7,7 +7,7 @@ use log::error;
use crate::service::{Service, ServiceFactory};
use crate::util::{counter::CounterGuard, Ready};
use crate::{rt::spawn, time::Duration};
use crate::{rt::spawn, time::Millis};
use super::socket::{FromStream, Stream};
use super::Token;
@ -17,7 +17,7 @@ pub(super) enum ServerMessage {
/// New stream
Connect(Stream),
/// Gracefull shutdown in millis
Shutdown(Duration),
Shutdown(Millis),
/// Force shutdown
ForceShutdown,
}

View file

@ -6,7 +6,7 @@ use async_oneshot as oneshot;
use futures_core::Stream as FutStream;
use crate::rt::{spawn, Arbiter};
use crate::time::{sleep, Duration, Sleep};
use crate::time::{sleep, Millis, Sleep};
use crate::util::{counter::Counter, join_all};
use super::accept::{AcceptNotify, Command};
@ -130,7 +130,7 @@ pub(super) struct Worker {
conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>,
state: WorkerState,
shutdown_timeout: Duration,
shutdown_timeout: Millis,
}
struct WorkerService {
@ -161,7 +161,7 @@ impl Worker {
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
shutdown_timeout: Duration,
shutdown_timeout: Millis,
) -> WorkerClient {
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
@ -191,7 +191,7 @@ impl Worker {
rx2: Receiver<StopCommand>,
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
shutdown_timeout: Duration,
shutdown_timeout: Millis,
) -> Result<Worker, ()> {
availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
@ -595,7 +595,7 @@ mod tests {
"127.0.0.1:8080".parse().unwrap(),
)],
avail.clone(),
Duration::from_millis(5_000),
Millis(5_000),
)
.await
.unwrap();
@ -667,7 +667,7 @@ mod tests {
"127.0.0.1:8080".parse().unwrap(),
)],
avail.clone(),
Duration::from_millis(5_000),
Millis(5_000),
)
.await
.unwrap();

View file

@ -5,7 +5,7 @@ use std::{future::Future, pin::Pin, task, task::Poll};
mod types;
mod wheel;
pub use self::types::{Duration, Seconds};
pub use self::types::{Millis, Seconds};
pub use self::wheel::TimerHandle;
/// Waits until `duration` has elapsed.
@ -14,8 +14,17 @@ pub use self::wheel::TimerHandle;
/// operates at 16.5 millisecond granularity and should not be used for tasks that
/// require high-resolution timers.
#[inline]
pub fn sleep<T: Into<Duration>>(dur: T) -> Sleep {
Sleep::new(dur.into().0)
pub fn sleep<T: Into<Millis>>(dur: T) -> Sleep {
Sleep::new(dur.into())
}
/// Creates new [`Interval`] that yields with interval of `period`.
///
/// An interval will tick indefinitely. At any time, the [`Interval`] value can
/// be dropped. This cancels the interval.
#[inline]
pub fn interval<T: Into<Millis>>(period: T) -> Interval {
Interval::new(period.into())
}
/// Require a `Future` to complete before the specified duration has elapsed.
@ -27,9 +36,9 @@ pub fn sleep<T: Into<Duration>>(dur: T) -> Sleep {
pub fn timeout<T, U>(dur: U, future: T) -> Timeout<T>
where
T: Future,
U: Into<Duration>,
U: Into<Millis>,
{
Timeout::new_with_delay(future, Sleep::new(dur.into().0))
Timeout::new_with_delay(future, Sleep::new(dur.into()))
}
/// Future returned by [`sleep`](sleep).
@ -57,9 +66,9 @@ pub struct Sleep {
impl Sleep {
/// Create new sleep future
#[inline]
pub fn new(millis: u64) -> Sleep {
pub fn new(duration: Millis) -> Sleep {
Sleep {
hnd: TimerHandle::new(millis),
hnd: TimerHandle::new(duration.0),
}
}
@ -76,8 +85,8 @@ impl Sleep {
///
/// This function can be called both before and after the future has
/// completed.
pub fn reset(&self, millis: u64) {
self.hnd.reset(millis);
pub fn reset<T: Into<Millis>>(&self, millis: T) {
self.hnd.reset(millis.into().0);
}
#[inline]
@ -132,3 +141,51 @@ where
}
}
}
/// Interval returned by [`interval`]
///
/// This type allows you to wait on a sequence of instants with a certain
/// duration between each instant.
#[derive(Debug)]
pub struct Interval {
hnd: TimerHandle,
period: u64,
}
impl Interval {
/// Create new sleep future
#[inline]
pub fn new(period: Millis) -> Interval {
Interval {
hnd: TimerHandle::new(period.0),
period: period.0,
}
}
#[inline]
pub async fn tick(&self) {
crate::util::poll_fn(|cx| self.poll_tick(cx)).await;
}
#[inline]
pub fn poll_tick(&self, cx: &mut task::Context<'_>) -> Poll<()> {
if self.hnd.poll_elapsed(cx).is_ready() {
self.hnd.reset(self.period);
Poll::Ready(())
} else {
Poll::Pending
}
}
}
impl crate::Stream for Interval {
type Item = ();
#[inline]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Self::Item>> {
self.poll_tick(cx).map(|_| Some(()))
}
}

View file

@ -1,4 +1,4 @@
use std::convert::TryInto;
use std::{convert::TryInto, ops};
// /// A measurement of a monotonically nondecreasing clock. Opaque and useful only with Duration.
// ///
@ -16,23 +16,18 @@ use std::convert::TryInto;
/// A Duration type to represent a span of time.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Duration(pub(super) u64);
pub struct Millis(pub u64);
impl Duration {
impl Millis {
/// Zero milliseconds value
pub const ZERO: Duration = Duration(0);
pub const ZERO: Millis = Millis(0);
/// One second value
pub const ONE_SEC: Duration = Duration(1_000);
pub const ONE_SEC: Millis = Millis(1_000);
#[inline]
pub const fn from_secs(secs: u32) -> Duration {
Duration((secs as u64) * 1000)
}
#[inline]
pub const fn from_millis(millis: u64) -> Duration {
Duration(millis)
pub const fn from_secs(secs: u32) -> Millis {
Millis((secs as u64) * 1000)
}
#[inline]
@ -49,7 +44,7 @@ impl Duration {
#[inline]
pub fn map<F, R>(&self, f: F) -> Option<R>
where
F: FnOnce(Duration) -> R,
F: FnOnce(Millis) -> R,
{
if self.0 == 0 {
None
@ -59,30 +54,94 @@ impl Duration {
}
}
impl Default for Duration {
impl Default for Millis {
#[inline]
fn default() -> Duration {
Duration::ZERO
fn default() -> Millis {
Millis::ZERO
}
}
impl From<u64> for Duration {
impl ops::Add<Millis> for Millis {
type Output = Millis;
#[inline]
fn from(millis: u64) -> Duration {
Duration(millis)
fn add(self, other: Millis) -> Millis {
Millis(self.0.checked_add(other.0).unwrap_or(u64::MAX))
}
}
impl From<Seconds> for Duration {
impl ops::Add<Seconds> for Millis {
type Output = Millis;
#[inline]
fn from(s: Seconds) -> Duration {
Duration((s.0 as u64) * 1000)
#[allow(clippy::suspicious_arithmetic_impl)]
fn add(self, other: Seconds) -> Millis {
Millis(
self.0
.checked_add((other.0 as u64) * 1000)
.unwrap_or(u64::MAX),
)
}
}
impl From<std::time::Duration> for Duration {
impl ops::Add<std::time::Duration> for Millis {
type Output = Millis;
#[inline]
fn from(d: std::time::Duration) -> Duration {
fn add(self, other: std::time::Duration) -> Millis {
self + Millis::from(other)
}
}
impl ops::Add<Millis> for std::time::Duration {
type Output = std::time::Duration;
#[inline]
fn add(self, other: Millis) -> std::time::Duration {
self + Self::from(other)
}
}
impl From<u64> for Millis {
#[inline]
fn from(millis: u64) -> Millis {
Millis(millis)
}
}
impl From<u128> for Millis {
#[inline]
fn from(d: u128) -> Millis {
Self(d.try_into().unwrap_or_else(|_| {
log::error!("time Duration is too large {:?}", d);
1 << 31
}))
}
}
impl From<i32> for Millis {
#[inline]
fn from(d: i32) -> Millis {
let millis = if d < 0 {
log::error!("time Duration is negative {:?}", d);
0
} else {
d as u64
};
Self(millis)
}
}
impl From<Seconds> for Millis {
#[inline]
fn from(s: Seconds) -> Millis {
Millis((s.0 as u64) * 1000)
}
}
impl From<std::time::Duration> for Millis {
#[inline]
fn from(d: std::time::Duration) -> Millis {
Self(d.as_millis().try_into().unwrap_or_else(|_| {
log::error!("time Duration is too large {:?}", d);
1 << 31
@ -90,9 +149,9 @@ impl From<std::time::Duration> for Duration {
}
}
impl From<Duration> for std::time::Duration {
impl From<Millis> for std::time::Duration {
#[inline]
fn from(d: Duration) -> std::time::Duration {
fn from(d: Millis) -> std::time::Duration {
std::time::Duration::from_millis(d.0)
}
}
@ -113,6 +172,16 @@ impl Seconds {
Seconds(secs)
}
#[inline]
pub const fn checked_new(secs: usize) -> Seconds {
let secs = if (u16::MAX as usize) < secs {
u16::MAX
} else {
secs as u16
};
Seconds(secs)
}
#[inline]
pub const fn is_zero(self) -> bool {
self.0 == 0
@ -132,12 +201,12 @@ impl Seconds {
#[inline]
pub fn map<F, R>(&self, f: F) -> Option<R>
where
F: FnOnce(Duration) -> R,
F: FnOnce(Millis) -> R,
{
if self.0 == 0 {
None
} else {
Some(f(Duration::from(*self)))
Some(f(Millis::from(*self)))
}
}
}
@ -149,6 +218,15 @@ impl Default for Seconds {
}
}
impl ops::Add<Seconds> for Seconds {
type Output = Seconds;
#[inline]
fn add(self, other: Seconds) -> Seconds {
Seconds(self.0.checked_add(other.0).unwrap_or(u16::MAX))
}
}
impl From<Seconds> for std::time::Duration {
#[inline]
fn from(d: Seconds) -> std::time::Duration {

View file

@ -6,7 +6,7 @@ use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task, task::Poll, tim
use slab::Slab;
use crate::rt::time::{sleep_until, Sleep};
use crate::rt::time_driver::{sleep_until, Sleep};
use crate::task::LocalWaker;
// Clock divisor for the next level

View file

@ -3,7 +3,7 @@ use std::time::{self, Instant};
use std::{cell::RefCell, convert::Infallible, rc::Rc};
use crate::service::{Service, ServiceFactory};
use crate::time::{sleep, Duration};
use crate::time::{sleep, Millis};
use crate::util::Ready;
#[derive(Clone, Debug)]
@ -11,12 +11,12 @@ pub struct LowResTime(Rc<RefCell<Inner>>);
#[derive(Debug)]
struct Inner {
resolution: Duration,
resolution: Millis,
current: Option<Instant>,
}
impl Inner {
fn new(resolution: Duration) -> Self {
fn new(resolution: Millis) -> Self {
Inner {
resolution,
current: None,
@ -26,7 +26,7 @@ impl Inner {
impl LowResTime {
/// Create new timer service
pub fn new<T: Into<Duration>>(resolution: T) -> LowResTime {
pub fn new<T: Into<Millis>>(resolution: T) -> LowResTime {
LowResTime(Rc::new(RefCell::new(Inner::new(resolution.into()))))
}
@ -37,9 +37,7 @@ impl LowResTime {
impl Default for LowResTime {
fn default() -> Self {
LowResTime(Rc::new(RefCell::new(Inner::new(Duration::from_millis(
1000,
)))))
LowResTime(Rc::new(RefCell::new(Inner::new(Millis(1000)))))
}
}
@ -62,7 +60,7 @@ impl ServiceFactory for LowResTime {
pub struct LowResTimeService(Rc<RefCell<Inner>>);
impl LowResTimeService {
pub fn new<T: Into<Duration>>(resolution: T) -> LowResTimeService {
pub fn new<T: Into<Millis>>(resolution: T) -> LowResTimeService {
LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution.into()))))
}
@ -112,12 +110,12 @@ pub struct SystemTime(Rc<RefCell<SystemTimeInner>>);
#[derive(Debug)]
struct SystemTimeInner {
resolution: Duration,
resolution: Millis,
current: Option<time::SystemTime>,
}
impl SystemTimeInner {
fn new(resolution: Duration) -> Self {
fn new(resolution: Millis) -> Self {
SystemTimeInner {
resolution,
current: None,
@ -130,7 +128,7 @@ pub struct SystemTimeService(Rc<RefCell<SystemTimeInner>>);
impl SystemTimeService {
/// Create new system time service
pub fn new<T: Into<Duration>>(resolution: T) -> SystemTimeService {
pub fn new<T: Into<Millis>>(resolution: T) -> SystemTimeService {
SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(
resolution.into(),
))))

View file

@ -5,7 +5,7 @@
use std::{fmt, future::Future, marker, pin::Pin, task::Context, task::Poll};
use crate::service::{IntoService, Service, Transform};
use crate::time::{sleep, Duration, Sleep};
use crate::time::{sleep, Millis, Sleep};
use crate::util::Either;
/// Applies a timeout to requests.
@ -13,7 +13,7 @@ use crate::util::Either;
/// Timeout transform is disabled if timeout is set to 0
#[derive(Debug)]
pub struct Timeout<E = ()> {
timeout: Duration,
timeout: Millis,
_t: marker::PhantomData<E>,
}
@ -65,7 +65,7 @@ impl<E: PartialEq> PartialEq for TimeoutError<E> {
}
impl Timeout {
pub fn new<T: Into<Duration>>(timeout: T) -> Self {
pub fn new<T: Into<Millis>>(timeout: T) -> Self {
Timeout {
timeout: timeout.into(),
_t: marker::PhantomData,
@ -100,7 +100,7 @@ where
#[derive(Debug, Clone)]
pub struct TimeoutService<S> {
service: S,
timeout: Duration,
timeout: Millis,
}
impl<S> TimeoutService<S>
@ -109,7 +109,7 @@ where
{
pub fn new<T, U>(timeout: T, service: U) -> Self
where
T: Into<Duration>,
T: Into<Millis>,
U: IntoService<S>,
{
TimeoutService {

View file

@ -1,7 +1,7 @@
//! Various helpers for ntex applications to use during testing.
use std::{
convert::TryFrom, error::Error, fmt, net, net::SocketAddr, rc::Rc, sync::mpsc,
thread, time,
thread,
};
#[cfg(feature = "cookie")]
@ -733,8 +733,8 @@ where
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
Connector::default()
.lifetime(time::Duration::from_secs(0))
.keep_alive(time::Duration::from_millis(30000))
.lifetime(Seconds::ZERO)
.keep_alive(Seconds(30))
.timeout(30_000)
.disconnect_timeout(3_000)
.openssl(builder.build())
@ -743,7 +743,7 @@ where
#[cfg(not(feature = "openssl"))]
{
Connector::default()
.lifetime(time::Duration::from_secs(0))
.lifetime(Seconds::ZERO)
.timeout(30_000)
.finish()
}

View file

@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::io::{Read, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use brotli2::write::BrotliEncoder;
use coo_kie::Cookie;
@ -825,7 +824,7 @@ async fn client_read_until_eof() {
}
}
});
ntex::rt::time::sleep(Duration::from_millis(300)).await;
ntex::time::sleep(300).await;
// client request
let req = Client::build()

View file

@ -1,5 +1,4 @@
use std::io::{Read, Write};
use std::{io, net, thread, time::Duration};
use std::{io, io::Read, io::Write, net};
use futures::future::{self, ok, ready, FutureExt};
use futures::stream::{once, StreamExt};
@ -9,7 +8,7 @@ use ntex::http::test::server as test_server;
use ntex::http::{
body, header, HttpService, KeepAlive, Method, Request, Response, StatusCode,
};
use ntex::rt::time::sleep;
use ntex::time::sleep;
use ntex::{service::fn_service, time::Seconds, util::Bytes, web::error};
#[ntex::test]
@ -58,7 +57,7 @@ async fn test_expect_continue() {
let srv = test_server(|| {
HttpService::build()
.expect(fn_service(|req: Request| async move {
sleep(Duration::from_millis(20)).await;
sleep(20).await;
if req.head().uri.query() == Some("yes=") {
Ok(req)
} else {
@ -216,7 +215,7 @@ async fn test_http1_keepalive_timeout() {
let mut data = vec![0; 1024];
let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
thread::sleep(Duration::from_millis(1100));
sleep(1100).await;
let mut data = vec![0; 1024];
let res = stream.read(&mut data).unwrap();