use new time api

This commit is contained in:
Nikolay Kim 2021-02-24 13:48:33 +06:00
parent 819f5b2eaf
commit 2d8ce48456
24 changed files with 137 additions and 122 deletions

View file

@ -70,6 +70,7 @@ pub mod time {
pub use tokio::time::Instant; pub use tokio::time::Instant;
pub use tokio::time::{interval, interval_at, Interval}; pub use tokio::time::{interval, interval_at, Interval};
pub use tokio::time::{sleep, sleep_until, Sleep}; pub use tokio::time::{sleep, sleep_until, Sleep};
#[doc(hidden)]
pub use tokio::time::{ pub use tokio::time::{
sleep as delay_for, sleep_until as delay_until, Sleep as Delay, sleep as delay_for, sleep_until as delay_until, Sleep as Delay,
}; };

View file

@ -482,7 +482,7 @@ mod tests {
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use crate::codec::BytesCodec; use crate::codec::BytesCodec;
use crate::rt::time::delay_for; use crate::rt::time::sleep;
use crate::testing::Io; use crate::testing::Io;
use super::*; use super::*;
@ -550,7 +550,7 @@ mod tests {
server, server,
BytesCodec, BytesCodec,
crate::fn_service(|msg: DispatchItem<BytesCodec>| async move { crate::fn_service(|msg: DispatchItem<BytesCodec>| async move {
delay_for(Duration::from_millis(50)).await; sleep(Duration::from_millis(50)).await;
if let DispatchItem::Item(msg) = msg { if let DispatchItem::Item(msg) = msg {
Ok::<_, ()>(Some(msg.freeze())) Ok::<_, ()>(Some(msg.freeze()))
} else { } else {
@ -596,7 +596,7 @@ mod tests {
assert_eq!(buf, Bytes::from_static(b"test")); assert_eq!(buf, Bytes::from_static(b"test"));
st.close(); st.close();
delay_for(Duration::from_millis(200)).await; sleep(Duration::from_millis(200)).await;
assert!(client.is_server_dropped()); assert!(client.is_server_dropped());
} }
@ -624,7 +624,7 @@ mod tests {
let buf = client.read_any(); let buf = client.read_any();
assert_eq!(buf, Bytes::from_static(b"")); assert_eq!(buf, Bytes::from_static(b""));
delay_for(Duration::from_millis(25)).await; sleep(Duration::from_millis(25)).await;
// buffer should be flushed // buffer should be flushed
client.remote_buffer_cap(1024); client.remote_buffer_cap(1024);
@ -682,7 +682,7 @@ mod tests {
let buf = client.read_any(); let buf = client.read_any();
assert_eq!(buf, Bytes::from_static(b"")); assert_eq!(buf, Bytes::from_static(b""));
client.write("GET /test HTTP/1\r\n\r\n"); client.write("GET /test HTTP/1\r\n\r\n");
delay_for(Duration::from_millis(25)).await; sleep(Duration::from_millis(25)).await;
// buf must be consumed // buf must be consumed
assert_eq!(client.remote_buffer(|buf| buf.len()), 0); assert_eq!(client.remote_buffer(|buf| buf.len()), 0);
@ -692,11 +692,11 @@ mod tests {
assert_eq!(state.with_write_buf(|buf| buf.len()), 65536); assert_eq!(state.with_write_buf(|buf| buf.len()), 65536);
client.remote_buffer_cap(10240); client.remote_buffer_cap(10240);
delay_for(Duration::from_millis(50)).await; sleep(Duration::from_millis(50)).await;
assert_eq!(state.with_write_buf(|buf| buf.len()), 55296); assert_eq!(state.with_write_buf(|buf| buf.len()), 55296);
client.remote_buffer_cap(45056); client.remote_buffer_cap(45056);
delay_for(Duration::from_millis(50)).await; sleep(Duration::from_millis(50)).await;
assert_eq!(state.with_write_buf(|buf| buf.len()), 10240); assert_eq!(state.with_write_buf(|buf| buf.len()), 10240);
// backpressure disabled // backpressure disabled
@ -740,7 +740,7 @@ mod tests {
let buf = client.read().await.unwrap(); let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n")); assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
delay_for(Duration::from_millis(3100)).await; sleep(Duration::from_millis(3100)).await;
// write side must be closed, dispatcher should fail with keep-alive // write side must be closed, dispatcher should fail with keep-alive
let flags = state.flags(); let flags = state.flags();

View file

@ -3,7 +3,7 @@ use std::{cell::RefCell, collections::BTreeMap, rc::Rc, time::Duration, time::In
use futures::future::{ready, FutureExt}; use futures::future::{ready, FutureExt};
use crate::framed::State; use crate::framed::State;
use crate::rt::time::delay_for; use crate::rt::time::sleep;
use crate::util::HashSet; use crate::util::HashSet;
pub struct Timer(Rc<RefCell<Inner>>); pub struct Timer(Rc<RefCell<Inner>>);
@ -84,7 +84,7 @@ impl Timer {
b.resolution b.resolution
}; };
crate::rt::spawn(delay_for(interval).then(move |_| { crate::rt::spawn(sleep(interval).then(move |_| {
let empty = { let empty = {
let mut i = inner.borrow_mut(); let mut i = inner.borrow_mut();
let now = i.current.take().unwrap_or_else(Instant::now); let now = i.current.take().unwrap_or_else(Instant::now);

View file

@ -9,7 +9,7 @@ use http::uri::Authority;
use crate::channel::pool; use crate::channel::pool;
use crate::codec::{AsyncRead, AsyncWrite, ReadBuf}; use crate::codec::{AsyncRead, AsyncWrite, ReadBuf};
use crate::http::Protocol; use crate::http::Protocol;
use crate::rt::{spawn, time::delay_for, time::Delay}; use crate::rt::{spawn, time::sleep, time::Sleep};
use crate::service::Service; use crate::service::Service;
use crate::task::LocalWaker; use crate::task::LocalWaker;
use crate::util::{Bytes, HashMap}; use crate::util::{Bytes, HashMap};
@ -366,10 +366,13 @@ where
} }
} }
struct CloseConnection<T> { pin_project_lite::pin_project! {
io: T, struct CloseConnection<T> {
timeout: Option<Pin<Box<Delay>>>, io: T,
shutdown: bool, #[pin]
timeout: Option<Sleep>,
shutdown: bool,
}
} }
impl<T> CloseConnection<T> impl<T> CloseConnection<T>
@ -378,7 +381,7 @@ where
{ {
fn spawn(io: T, timeout: Duration) { fn spawn(io: T, timeout: Duration) {
let timeout = if timeout != ZERO { let timeout = if timeout != ZERO {
Some(Box::pin(delay_for(timeout))) Some(sleep(timeout))
} else { } else {
None None
}; };
@ -397,18 +400,18 @@ where
type Output = (); type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let this = self.get_mut(); let mut this = self.project();
// shutdown WRITE side // shutdown WRITE side
match Pin::new(&mut this.io).poll_shutdown(cx) { match Pin::new(&mut this.io).poll_shutdown(cx) {
Poll::Ready(Ok(())) => this.shutdown = true, Poll::Ready(Ok(())) => *this.shutdown = true,
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(Err(_)) => return Poll::Ready(()), Poll::Ready(Err(_)) => return Poll::Ready(()),
} }
// read until 0 or err // read until 0 or err
if let Some(ref mut timeout) = this.timeout { if let Some(timeout) = this.timeout.as_pin_mut() {
match Pin::new(timeout).poll(cx) { match timeout.poll(cx) {
Poll::Ready(_) => (), Poll::Ready(_) => (),
Poll::Pending => { Poll::Pending => {
let mut buf = [0u8; 512]; let mut buf = [0u8; 512];
@ -609,7 +612,7 @@ mod tests {
use super::*; use super::*;
use crate::http::client::Connection; use crate::http::client::Connection;
use crate::http::Uri; use crate::http::Uri;
use crate::rt::time::delay_for; use crate::rt::time::sleep;
use crate::service::fn_service; use crate::service::fn_service;
use crate::testing::Io; use crate::testing::Io;
@ -667,7 +670,7 @@ mod tests {
let mut fut = pool.call(req.clone()); let mut fut = pool.call(req.clone());
assert!(lazy(|cx| Pin::new(&mut fut).poll(cx)).await.is_pending()); assert!(lazy(|cx| Pin::new(&mut fut).poll(cx)).await.is_pending());
drop(fut); drop(fut);
delay_for(Duration::from_millis(50)).await; sleep(Duration::from_millis(50)).await;
pool.1.borrow_mut().check_availibility(); pool.1.borrow_mut().check_availibility();
assert!(pool.1.borrow().waiters.is_empty()); assert!(pool.1.borrow().waiters.is_empty());

View file

@ -14,7 +14,7 @@ use crate::http::body::{Body, BodyStream};
use crate::http::error::HttpError; use crate::http::error::HttpError;
use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue}; use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue};
use crate::http::RequestHeadType; use crate::http::RequestHeadType;
use crate::rt::time::{delay_for, Delay}; use crate::rt::time::{sleep, Sleep};
#[cfg(feature = "compress")] #[cfg(feature = "compress")]
use crate::http::encoding::Decoder; use crate::http::encoding::Decoder;
@ -54,7 +54,7 @@ impl From<PrepForSendingError> for SendRequestError {
pub enum SendClientRequest { pub enum SendClientRequest {
Fut( Fut(
Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>, Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>,
Option<Pin<Box<Delay>>>, Option<Pin<Box<Sleep>>>,
bool, bool,
), ),
Err(Option<SendRequestError>), Err(Option<SendRequestError>),
@ -66,7 +66,7 @@ impl SendClientRequest {
response_decompress: bool, response_decompress: bool,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> SendClientRequest { ) -> SendClientRequest {
let delay = timeout.map(|d| Box::pin(delay_for(d))); let delay = timeout.map(|d| Box::pin(sleep(d)));
SendClientRequest::Fut(send, delay, response_decompress) SendClientRequest::Fut(send, delay, response_decompress)
} }
} }

View file

@ -5,7 +5,7 @@ use futures::{future, FutureExt};
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::framed::Timer; use crate::framed::Timer;
use crate::rt::time::{delay_for, delay_until, Delay, Instant}; use crate::rt::time::{sleep, sleep_until, Instant, Sleep};
#[derive(Debug, PartialEq, Clone, Copy)] #[derive(Debug, PartialEq, Clone, Copy)]
/// Server keep-alive setting /// Server keep-alive setting
@ -127,10 +127,10 @@ impl<S, X, U> DispatcherConfig<S, X, U> {
self.ka_enabled self.ka_enabled
} }
/// Return keep-alive timer delay is configured. /// Return keep-alive timer Sleep is configured.
pub(super) fn keep_alive_timer(&self) -> Option<Delay> { pub(super) fn keep_alive_timer(&self) -> Option<Sleep> {
if self.keep_alive.as_secs() != 0 { if self.keep_alive.as_secs() != 0 {
Some(delay_until(self.timer.now() + self.keep_alive)) Some(sleep_until(self.timer.now() + self.keep_alive))
} else { } else {
None None
} }
@ -203,7 +203,7 @@ impl DateService {
// periodic date update // periodic date update
let s = self.clone(); let s = self.clone();
crate::rt::spawn(delay_for(Duration::from_millis(500)).then(move |_| { crate::rt::spawn(sleep(Duration::from_millis(500)).then(move |_| {
s.0.current.set(false); s.0.current.set(false);
future::ready(()) future::ready(())
})); }));

View file

@ -670,7 +670,7 @@ mod tests {
use crate::http::config::{DispatcherConfig, ServiceConfig}; use crate::http::config::{DispatcherConfig, ServiceConfig};
use crate::http::h1::{ClientCodec, ExpectHandler, UpgradeHandler}; use crate::http::h1::{ClientCodec, ExpectHandler, UpgradeHandler};
use crate::http::{body, Request, ResponseHead, StatusCode}; use crate::http::{body, Request, ResponseHead, StatusCode};
use crate::rt::time::delay_for; use crate::rt::time::sleep;
use crate::service::IntoService; use crate::service::IntoService;
use crate::testing::Io; use crate::testing::Io;
@ -735,11 +735,11 @@ mod tests {
client.write("GET /test HTTP/1\r\n\r\n"); client.write("GET /test HTTP/1\r\n\r\n");
let mut h1 = h1(server, |_| ok::<_, io::Error>(Response::Ok().finish())); let mut h1 = h1(server, |_| ok::<_, io::Error>(Response::Ok().finish()));
delay_for(time::Duration::from_millis(50)).await; sleep(time::Duration::from_millis(50)).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready()); assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
assert!(!h1.inner.state.is_open()); assert!(!h1.inner.state.is_open());
delay_for(time::Duration::from_millis(50)).await; sleep(time::Duration::from_millis(50)).await;
client client
.local_buffer(|buf| assert_eq!(&buf[..26], b"HTTP/1.1 400 Bad Request\r\n")); .local_buffer(|buf| assert_eq!(&buf[..26], b"HTTP/1.1 400 Bad Request\r\n"));
@ -788,7 +788,7 @@ mod tests {
}); });
client.write("GET /test HTTP/1.1\r\ncontent-length: 5\r\n\r\n"); client.write("GET /test HTTP/1.1\r\ncontent-length: 5\r\n\r\n");
delay_for(time::Duration::from_millis(50)).await; sleep(time::Duration::from_millis(50)).await;
client.write("xxxxx"); client.write("xxxxx");
let mut buf = client.read().await.unwrap(); let mut buf = client.read().await.unwrap();
@ -812,7 +812,7 @@ mod tests {
client.remote_buffer_cap(4096); client.remote_buffer_cap(4096);
let mut decoder = ClientCodec::default(); let mut decoder = ClientCodec::default();
spawn_h1(server, |_| async { spawn_h1(server, |_| async {
delay_for(time::Duration::from_millis(100)).await; sleep(time::Duration::from_millis(100)).await;
Ok::<_, io::Error>(Response::Ok().finish()) Ok::<_, io::Error>(Response::Ok().finish())
}); });
@ -824,7 +824,7 @@ mod tests {
client.write("GET /test HTTP/1.1\r\n\r\n"); client.write("GET /test HTTP/1.1\r\n\r\n");
client.write("GET /test HTTP/1.1\r\n\r\n"); client.write("GET /test HTTP/1.1\r\n\r\n");
delay_for(time::Duration::from_millis(50)).await; sleep(time::Duration::from_millis(50)).await;
client.write("GET /test HTTP/1.1\r\n\r\n"); client.write("GET /test HTTP/1.1\r\n\r\n");
let mut buf = client.read().await.unwrap(); let mut buf = client.read().await.unwrap();
@ -885,10 +885,10 @@ mod tests {
.collect::<String>(); .collect::<String>();
client.write("GET /test HTTP/1.1\r\nContent-Length: "); client.write("GET /test HTTP/1.1\r\nContent-Length: ");
client.write(data); client.write(data);
delay_for(time::Duration::from_millis(50)).await; sleep(time::Duration::from_millis(50)).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending()); assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
delay_for(time::Duration::from_millis(50)).await; sleep(time::Duration::from_millis(50)).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready()); assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
assert!(!h1.inner.state.is_open()); assert!(!h1.inner.state.is_open());
@ -911,13 +911,13 @@ mod tests {
let _ = pl.next().await.unwrap().unwrap(); let _ = pl.next().await.unwrap().unwrap();
m.store(true, Ordering::Relaxed); m.store(true, Ordering::Relaxed);
// sleep // sleep
delay_for(time::Duration::from_secs(999_999)).await; sleep(time::Duration::from_secs(999_999)).await;
Ok::<_, io::Error>(Response::Ok().finish()) Ok::<_, io::Error>(Response::Ok().finish())
} }
}); });
client.write("GET /test HTTP/1.1\r\nContent-Length: 1048576\r\n\r\n"); client.write("GET /test HTTP/1.1\r\nContent-Length: 1048576\r\n\r\n");
delay_for(time::Duration::from_millis(50)).await; sleep(time::Duration::from_millis(50)).await;
// buf must be consumed // buf must be consumed
assert_eq!(client.remote_buffer(|buf| buf.len()), 0); assert_eq!(client.remote_buffer(|buf| buf.len()), 0);
@ -927,7 +927,7 @@ mod tests {
(0..1_048_576).map(|_| rand::random::<u8>()).collect(); (0..1_048_576).map(|_| rand::random::<u8>()).collect();
client.write(random_bytes); client.write(random_bytes);
delay_for(time::Duration::from_millis(50)).await; sleep(time::Duration::from_millis(50)).await;
assert!(client.remote_buffer(|buf| buf.len()) > 1_048_576 - BUFFER_SIZE * 3); assert!(client.remote_buffer(|buf| buf.len()) > 1_048_576 - BUFFER_SIZE * 3);
assert!(mark.load(Ordering::Relaxed)); assert!(mark.load(Ordering::Relaxed));
} }
@ -969,7 +969,7 @@ mod tests {
// do not allow to write to socket // do not allow to write to socket
client.remote_buffer_cap(0); client.remote_buffer_cap(0);
client.write("GET /test HTTP/1.1\r\n\r\n"); client.write("GET /test HTTP/1.1\r\n\r\n");
delay_for(time::Duration::from_millis(50)).await; sleep(time::Duration::from_millis(50)).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending()); assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
// buf must be consumed // buf must be consumed
@ -982,7 +982,7 @@ mod tests {
assert_eq!(state.with_write_buf(|buf| buf.len()), 65629); assert_eq!(state.with_write_buf(|buf| buf.len()), 65629);
client.remote_buffer_cap(65536); client.remote_buffer_cap(65536);
delay_for(time::Duration::from_millis(50)).await; sleep(time::Duration::from_millis(50)).await;
assert_eq!(state.with_write_buf(|buf| buf.len()), 93); assert_eq!(state.with_write_buf(|buf| buf.len()), 93);
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending()); assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
@ -1022,7 +1022,7 @@ mod tests {
}); });
client.write("GET /test HTTP/1.1\r\n\r\n"); client.write("GET /test HTTP/1.1\r\n\r\n");
delay_for(time::Duration::from_millis(50)).await; sleep(time::Duration::from_millis(50)).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending()); assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
// http message must be consumed // http message must be consumed
@ -1034,7 +1034,7 @@ mod tests {
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending()); assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
client.close().await; client.close().await;
delay_for(time::Duration::from_millis(50)).await; sleep(time::Duration::from_millis(50)).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready()); assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
} }
} }

View file

@ -18,7 +18,7 @@ use crate::http::message::ResponseHead;
use crate::http::payload::Payload; use crate::http::payload::Payload;
use crate::http::request::Request; use crate::http::request::Request;
use crate::http::response::Response; use crate::http::response::Response;
use crate::rt::time::{Delay, Instant}; use crate::rt::time::{Instant, Sleep};
use crate::Service; use crate::Service;
const CHUNK_SIZE: usize = 16_384; const CHUNK_SIZE: usize = 16_384;
@ -31,7 +31,7 @@ pin_project_lite::pin_project! {
on_connect: Option<Box<dyn DataFactory>>, on_connect: Option<Box<dyn DataFactory>>,
peer_addr: Option<net::SocketAddr>, peer_addr: Option<net::SocketAddr>,
ka_expire: Instant, ka_expire: Instant,
ka_timer: Option<Delay>, ka_timer: Option<Sleep>,
_t: PhantomData<B>, _t: PhantomData<B>,
} }
} }
@ -48,7 +48,7 @@ where
config: Rc<DispatcherConfig<S, X, U>>, config: Rc<DispatcherConfig<S, X, U>>,
connection: Connection<T, Bytes>, connection: Connection<T, Bytes>,
on_connect: Option<Box<dyn DataFactory>>, on_connect: Option<Box<dyn DataFactory>>,
timeout: Option<Delay>, timeout: Option<Sleep>,
peer_addr: Option<net::SocketAddr>, peer_addr: Option<net::SocketAddr>,
) -> Self { ) -> Self {
// keep-alive timer // keep-alive timer

View file

@ -3,7 +3,7 @@ use std::{io, sync::mpsc as sync_mpsc, sync::Arc, thread, time::Duration};
use log::{error, info}; use log::{error, info};
use slab::Slab; use slab::Slab;
use crate::rt::time::{delay_until, Instant}; use crate::rt::time::{sleep_until, Instant};
use crate::rt::System; use crate::rt::System;
use super::socket::{Listener, SocketAddr}; use super::socket::{Listener, SocketAddr};
@ -415,7 +415,7 @@ impl Accept {
let notify = self.notify.clone(); let notify = self.notify.clone();
System::current().arbiter().spawn(Box::pin(async move { System::current().arbiter().spawn(Box::pin(async move {
delay_until(Instant::now() + Duration::from_millis(510)) sleep_until(Instant::now() + Duration::from_millis(510))
.await; .await;
notify.send(Command::Timer); notify.send(Command::Timer);
})); }));

View file

@ -12,7 +12,7 @@ use log::{error, info};
use socket2::{Domain, SockAddr, Socket, Type}; use socket2::{Domain, SockAddr, Socket, Type};
use crate::rt::net::TcpStream; use crate::rt::net::TcpStream;
use crate::rt::time::{delay_until, Instant}; use crate::rt::time::{sleep_until, Instant};
use crate::rt::{spawn, System}; use crate::rt::{spawn, System};
use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::accept::{AcceptLoop, AcceptNotify, Command};
@ -385,7 +385,7 @@ impl ServerBuilder {
if exit { if exit {
spawn( spawn(
async { async {
delay_until( sleep_until(
Instant::now() Instant::now()
+ Duration::from_millis(300), + Duration::from_millis(300),
) )
@ -402,7 +402,7 @@ impl ServerBuilder {
// we need to stop system if server was spawned // we need to stop system if server was spawned
if self.exit { if self.exit {
spawn( spawn(
delay_until(Instant::now() + Duration::from_millis(300)) sleep_until(Instant::now() + Duration::from_millis(300))
.then(|_| { .then(|_| {
System::current().stop(); System::current().stop();
ready(()) ready(())

View file

@ -12,7 +12,7 @@ pub use tokio_openssl::SslStream;
use futures::future::{ok, FutureExt, LocalBoxFuture, Ready}; use futures::future::{ok, FutureExt, LocalBoxFuture, Ready};
use crate::codec::{AsyncRead, AsyncWrite}; use crate::codec::{AsyncRead, AsyncWrite};
use crate::rt::time::{delay_for, Delay}; use crate::rt::time::{sleep, Sleep};
use crate::service::{Service, ServiceFactory}; use crate::service::{Service, ServiceFactory};
use crate::util::counter::{Counter, CounterGuard}; use crate::util::counter::{Counter, CounterGuard};
@ -114,7 +114,7 @@ where
delay: if self.timeout == ZERO { delay: if self.timeout == ZERO {
None None
} else { } else {
Some(Box::pin(delay_for(self.timeout))) Some(sleep(self.timeout))
}, },
fut: async move { fut: async move {
let mut io = SslStream::new(ssl, req)?; let mut io = SslStream::new(ssl, req)?;
@ -129,21 +129,27 @@ where
} }
} }
pub struct AcceptorServiceResponse<T> pin_project_lite::pin_project! {
where pub struct AcceptorServiceResponse<T>
T: AsyncRead + AsyncWrite, where
{ T: AsyncRead,
fut: LocalBoxFuture<'static, Result<SslStream<T>, Box<dyn Error>>>, T: AsyncWrite,
delay: Option<Pin<Box<Delay>>>, {
_guard: CounterGuard, fut: LocalBoxFuture<'static, Result<SslStream<T>, Box<dyn Error>>>,
#[pin]
delay: Option<Sleep>,
_guard: CounterGuard,
}
} }
impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> { impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
type Output = Result<SslStream<T>, Box<dyn Error>>; type Output = Result<SslStream<T>, Box<dyn Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(ref mut delay) = self.delay { let mut this = self.project();
match Pin::new(delay).poll(cx) {
if let Some(delay) = this.delay.as_pin_mut() {
match delay.poll(cx) {
Poll::Pending => (), Poll::Pending => (),
Poll::Ready(_) => { Poll::Ready(_) => {
return Poll::Ready(Err(Box::new(io::Error::new( return Poll::Ready(Err(Box::new(io::Error::new(
@ -154,7 +160,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
} }
} }
let io = futures::ready!(Pin::new(&mut self.fut).poll(cx))?; let io = futures::ready!(Pin::new(&mut this.fut).poll(cx))?;
Poll::Ready(Ok(io)) Poll::Ready(Ok(io))
} }
} }

View file

@ -106,29 +106,34 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Service for AcceptorService<T> {
delay: if self.timeout == ZERO { delay: if self.timeout == ZERO {
None None
} else { } else {
Some(Box::pin(sleep(self.timeout))) Some(sleep(self.timeout))
}, },
} }
} }
} }
pub struct AcceptorServiceFut<T> pin_project_lite::pin_project! {
where pub struct AcceptorServiceFut<T>
T: AsyncRead + AsyncWrite + Unpin, where
{ T: AsyncRead,
fut: Accept<T>, T: AsyncWrite,
delay: Option<Pin<Box<Sleep>>>, T: Unpin,
_guard: CounterGuard, {
fut: Accept<T>,
#[pin]
delay: Option<Sleep>,
_guard: CounterGuard,
}
} }
impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceFut<T> { impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceFut<T> {
type Output = Result<TlsStream<T>, Box<dyn Error>>; type Output = Result<TlsStream<T>, Box<dyn Error>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let mut this = self.project();
if let Some(ref mut delay) = this.delay { if let Some(delay) = this.delay.as_pin_mut() {
match Pin::new(delay).poll(cx) { match delay.poll(cx) {
Poll::Pending => (), Poll::Pending => (),
Poll::Ready(_) => { Poll::Ready(_) => {
return Poll::Ready(Err(Box::new(io::Error::new( return Poll::Ready(Err(Box::new(io::Error::new(

View file

@ -7,7 +7,7 @@ use futures::channel::oneshot;
use futures::future::{join_all, LocalBoxFuture, MapOk}; use futures::future::{join_all, LocalBoxFuture, MapOk};
use futures::{Future, FutureExt, Stream as StdStream, TryFutureExt}; use futures::{Future, FutureExt, Stream as StdStream, TryFutureExt};
use crate::rt::time::{delay_until, Delay, Instant}; use crate::rt::time::{sleep_until, Instant, Sleep};
use crate::rt::{spawn, Arbiter}; use crate::rt::{spawn, Arbiter};
use crate::util::counter::Counter; use crate::util::counter::Counter;
@ -320,8 +320,8 @@ enum WorkerState {
Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>, Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>,
), ),
Shutdown( Shutdown(
Pin<Box<Delay>>, Pin<Box<Sleep>>,
Pin<Box<Delay>>, Pin<Box<Sleep>>,
Option<oneshot::Sender<bool>>, Option<oneshot::Sender<bool>>,
), ),
} }
@ -346,10 +346,10 @@ impl Future for Worker {
if num != 0 { if num != 0 {
info!("Graceful worker shutdown, {} connections", num); info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown( self.state = WorkerState::Shutdown(
Box::pin(delay_until( Box::pin(sleep_until(
Instant::now() + time::Duration::from_secs(1), Instant::now() + time::Duration::from_secs(1),
)), )),
Box::pin(delay_until(Instant::now() + self.shutdown_timeout)), Box::pin(sleep_until(Instant::now() + self.shutdown_timeout)),
Some(result), Some(result),
); );
} else { } else {
@ -437,7 +437,7 @@ impl Future for Worker {
match t1.as_mut().poll(cx) { match t1.as_mut().poll(cx) {
Poll::Pending => (), Poll::Pending => (),
Poll::Ready(_) => { Poll::Ready(_) => {
*t1 = Box::pin(delay_until( *t1 = Box::pin(sleep_until(
Instant::now() + time::Duration::from_secs(1), Instant::now() + time::Duration::from_secs(1),
)); ));
let _ = t1.as_mut().poll(cx); let _ = t1.as_mut().poll(cx);

View file

@ -9,7 +9,7 @@ use futures::future::poll_fn;
use futures::task::AtomicWaker; use futures::task::AtomicWaker;
use crate::codec::{AsyncRead, AsyncWrite, ReadBuf}; use crate::codec::{AsyncRead, AsyncWrite, ReadBuf};
use crate::rt::time::delay_for; use crate::rt::time::sleep;
/// Async io stream /// Async io stream
#[derive(Debug)] #[derive(Debug)]
@ -156,7 +156,7 @@ impl Io {
remote.read = IoState::Close; remote.read = IoState::Close;
remote.waker.wake(); remote.waker.wake();
} }
delay_for(time::Duration::from_millis(35)).await; sleep(time::Duration::from_millis(35)).await;
} }
/// Add extra data to the remote buffer and notify reader /// Add extra data to the remote buffer and notify reader

View file

@ -140,7 +140,7 @@ mod tests {
} }
fn call(&self, _: ()) -> Self::Future { fn call(&self, _: ()) -> Self::Future {
crate::rt::time::delay_for(self.0) crate::rt::time::sleep(self.0)
.then(|_| ok::<_, ()>(())) .then(|_| ok::<_, ()>(()))
.boxed_local() .boxed_local()
} }

View file

@ -8,7 +8,7 @@ use std::time::Duration;
use futures::future::{ok, Ready}; use futures::future::{ok, Ready};
use crate::rt::time::{delay_until, Delay, Instant}; use crate::rt::time::{sleep_until, Instant, Sleep};
use crate::{Service, ServiceFactory}; use crate::{Service, ServiceFactory};
use super::time::{LowResTime, LowResTimeService}; use super::time::{LowResTime, LowResTimeService};
@ -85,7 +85,7 @@ pub struct KeepAliveService<R, E, F> {
} }
struct Inner { struct Inner {
delay: Pin<Box<Delay>>, delay: Pin<Box<Sleep>>,
expire: Instant, expire: Instant,
} }
@ -101,7 +101,7 @@ where
time, time,
inner: RefCell::new(Inner { inner: RefCell::new(Inner {
expire, expire,
delay: Box::pin(delay_until(expire)), delay: Box::pin(sleep_until(expire)),
}), }),
_t: PhantomData, _t: PhantomData,
} }
@ -147,7 +147,7 @@ mod tests {
use futures::future::lazy; use futures::future::lazy;
use super::*; use super::*;
use crate::rt::time::delay_for; use crate::rt::time::sleep;
use crate::service::{Service, ServiceFactory}; use crate::service::{Service, ServiceFactory};
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -167,7 +167,7 @@ mod tests {
assert_eq!(service.call(1usize).await, Ok(1usize)); assert_eq!(service.call(1usize).await, Ok(1usize));
assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready()); assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready());
delay_for(Duration::from_millis(500)).await; sleep(Duration::from_millis(500)).await;
assert_eq!( assert_eq!(
lazy(|cx| service.poll_ready(cx)).await, lazy(|cx| service.poll_ready(cx)).await,
Poll::Ready(Err(TestErr)) Poll::Ready(Err(TestErr))

View file

@ -163,7 +163,7 @@ mod tests {
use super::*; use super::*;
use crate::channel::mpsc; use crate::channel::mpsc;
use crate::codec::Encoder; use crate::codec::Encoder;
use crate::rt::time::delay_for; use crate::rt::time::sleep;
use crate::ws; use crate::ws;
#[crate::rt_test] #[crate::rt_test]
@ -197,7 +197,7 @@ mod tests {
assert_eq!(data, b"\x81\x04test".as_ref()); assert_eq!(data, b"\x81\x04test".as_ref());
drop(tx); drop(tx);
delay_for(Duration::from_millis(10)).await; sleep(Duration::from_millis(10)).await;
assert!(rx.next().await.is_none()); assert!(rx.next().await.is_none());
assert_eq!(counter.get(), 1); assert_eq!(counter.get(), 1);

View file

@ -6,7 +6,7 @@ use std::time::{self, Duration, Instant};
use futures::future::{ok, ready, FutureExt, Ready}; use futures::future::{ok, ready, FutureExt, Ready};
use crate::rt::time::delay_for; use crate::rt::time::sleep;
use crate::service::{Service, ServiceFactory}; use crate::service::{Service, ServiceFactory};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -81,7 +81,7 @@ impl LowResTimeService {
b.resolution b.resolution
}; };
crate::rt::spawn(delay_for(interval).then(move |_| { crate::rt::spawn(sleep(interval).then(move |_| {
inner.borrow_mut().current.take(); inner.borrow_mut().current.take();
ready(()) ready(())
})); }));
@ -148,7 +148,7 @@ impl SystemTimeService {
b.resolution b.resolution
}; };
crate::rt::spawn(delay_for(interval).then(move |_| { crate::rt::spawn(sleep(interval).then(move |_| {
inner.borrow_mut().current.take(); inner.borrow_mut().current.take();
ready(()) ready(())
})); }));
@ -208,7 +208,7 @@ mod tests {
.duration_since(SystemTime::UNIX_EPOCH) .duration_since(SystemTime::UNIX_EPOCH)
.unwrap(); .unwrap();
delay_for(wait_time).await; sleep(wait_time).await;
let second_time = time_service let second_time = time_service
.now() .now()
@ -230,7 +230,7 @@ mod tests {
let first_time = time_service.now(); let first_time = time_service.now();
delay_for(wait_time).await; sleep(wait_time).await;
let second_time = time_service.now(); let second_time = time_service.now();
assert!(second_time - first_time >= wait_time); assert!(second_time - first_time >= wait_time);

View file

@ -10,7 +10,7 @@ use std::{fmt, time};
use futures::future::{ok, Either, Ready}; use futures::future::{ok, Either, Ready};
use crate::rt::time::{delay_for, Delay}; use crate::rt::time::{sleep, Sleep};
use crate::service::{IntoService, Service, Transform}; use crate::service::{IntoService, Service, Transform};
const ZERO: time::Duration = time::Duration::from_millis(0); const ZERO: time::Duration = time::Duration::from_millis(0);
@ -154,21 +154,21 @@ where
} else { } else {
Either::Left(TimeoutServiceResponse { Either::Left(TimeoutServiceResponse {
fut: self.service.call(request), fut: self.service.call(request),
sleep: Box::pin(delay_for(self.timeout)), sleep: Box::pin(sleep(self.timeout)),
}) })
} }
} }
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
/// `TimeoutService` response future /// `TimeoutService` response future
#[doc(hidden)] #[doc(hidden)]
#[derive(Debug)] #[derive(Debug)]
pub struct TimeoutServiceResponse<T: Service> { pub struct TimeoutServiceResponse<T: Service> {
#[pin] #[pin]
fut: T::Future, fut: T::Future,
sleep: Pin<Box<Delay>>, sleep: Pin<Box<Sleep>>,
} }
} }
impl<T> Future for TimeoutServiceResponse<T> impl<T> Future for TimeoutServiceResponse<T>
@ -255,7 +255,7 @@ mod tests {
} }
fn call(&self, _: ()) -> Self::Future { fn call(&self, _: ()) -> Self::Future {
crate::rt::time::delay_for(self.0) crate::rt::time::sleep(self.0)
.then(|_| ok::<_, SrvError>(())) .then(|_| ok::<_, SrvError>(()))
.boxed_local() .boxed_local()
} }

View file

@ -538,7 +538,7 @@ mod tests {
use crate::http::header::{self, HeaderValue}; use crate::http::header::{self, HeaderValue};
use crate::http::{Method, StatusCode}; use crate::http::{Method, StatusCode};
use crate::rt::time::delay_for; use crate::rt::time::sleep;
use crate::web::middleware::DefaultHeaders; use crate::web::middleware::DefaultHeaders;
use crate::web::request::WebRequest; use crate::web::request::WebRequest;
use crate::web::test::{call_service, init_service, TestRequest}; use crate::web::test::{call_service, init_service, TestRequest};
@ -603,7 +603,7 @@ mod tests {
async fn test_to() { async fn test_to() {
let srv = let srv =
init_service(App::new().service(web::resource("/test").to(|| async { init_service(App::new().service(web::resource("/test").to(|| async {
delay_for(Duration::from_millis(100)).await; sleep(Duration::from_millis(100)).await;
HttpResponse::Ok() HttpResponse::Ok()
}))) })))
.await; .await;

View file

@ -285,7 +285,7 @@ mod tests {
use serde_derive::Serialize; use serde_derive::Serialize;
use crate::http::{Method, StatusCode}; use crate::http::{Method, StatusCode};
use crate::rt::time::delay_for; use crate::rt::time::sleep;
use crate::web::test::{call_service, init_service, read_body, TestRequest}; use crate::web::test::{call_service, init_service, read_body, TestRequest};
use crate::web::{self, error, App, DefaultError, HttpResponse}; use crate::web::{self, error, App, DefaultError, HttpResponse};
@ -306,16 +306,16 @@ mod tests {
) )
}), }),
web::post().to(|| async { web::post().to(|| async {
delay_for(Duration::from_millis(100)).await; sleep(Duration::from_millis(100)).await;
HttpResponse::Created() HttpResponse::Created()
}), }),
web::delete().to(|| async { web::delete().to(|| async {
delay_for(Duration::from_millis(100)).await; sleep(Duration::from_millis(100)).await;
Err::<HttpResponse, _>(error::ErrorBadRequest("err")) Err::<HttpResponse, _>(error::ErrorBadRequest("err"))
}), }),
])) ]))
.service(web::resource("/json").route(web::get().to(|| async { .service(web::resource("/json").route(web::get().to(|| async {
delay_for(Duration::from_millis(25)).await; sleep(Duration::from_millis(25)).await;
web::types::Json(MyObject { web::types::Json(MyObject {
name: "test".to_string(), name: "test".to_string(),
}) })

View file

@ -22,7 +22,7 @@ use crate::http::header::{HeaderName, HeaderValue, CONTENT_TYPE};
use crate::http::test::TestRequest as HttpTestRequest; use crate::http::test::TestRequest as HttpTestRequest;
use crate::http::{HttpService, Method, Payload, Request, StatusCode, Uri, Version}; use crate::http::{HttpService, Method, Payload, Request, StatusCode, Uri, Version};
use crate::router::{Path, ResourceDef}; use crate::router::{Path, ResourceDef};
use crate::rt::{time::delay_for, System}; use crate::rt::{time::sleep, System};
use crate::server::Server; use crate::server::Server;
use crate::util::Extensions; use crate::util::Extensions;
use crate::{map_config, IntoService, IntoServiceFactory, Service, ServiceFactory}; use crate::{map_config, IntoService, IntoServiceFactory, Service, ServiceFactory};
@ -955,7 +955,7 @@ impl TestServer {
pub async fn stop(self) { pub async fn stop(self) {
self.server.stop(true).await; self.server.stop(true).await;
self.system.stop(); self.system.stop();
delay_for(time::Duration::from_millis(100)).await; sleep(time::Duration::from_millis(100)).await;
} }
} }

View file

@ -169,7 +169,7 @@ async fn test_form() {
async fn test_timeout() { async fn test_timeout() {
let srv = test::server(|| { let srv = test::server(|| {
App::new().service(web::resource("/").route(web::to(|| async { App::new().service(web::resource("/").route(web::to(|| async {
ntex::rt::time::delay_for(Duration::from_millis(200)).await; ntex::rt::time::sleep(Duration::from_millis(200)).await;
HttpResponse::Ok().body(STR) HttpResponse::Ok().body(STR)
}))) })))
}); });
@ -198,7 +198,7 @@ async fn test_timeout() {
async fn test_timeout_override() { async fn test_timeout_override() {
let srv = test::server(|| { let srv = test::server(|| {
App::new().service(web::resource("/").route(web::to(|| async { App::new().service(web::resource("/").route(web::to(|| async {
ntex::rt::time::delay_for(Duration::from_millis(200)).await; ntex::rt::time::sleep(Duration::from_millis(200)).await;
HttpResponse::Ok().body(STR) HttpResponse::Ok().body(STR)
}))) })))
}); });
@ -841,7 +841,7 @@ async fn client_read_until_eof() {
.write_all(b"HTTP/1.0 200 OK\r\nconnection: close\r\n\r\nwelcome!"); .write_all(b"HTTP/1.0 200 OK\r\nconnection: close\r\n\r\nwelcome!");
} }
}); });
ntex::rt::time::delay_for(Duration::from_millis(300)).await; ntex::rt::time::sleep(Duration::from_millis(300)).await;
// client request // client request
let req = Client::build() let req = Client::build()

View file

@ -11,7 +11,7 @@ use ntex::http::test::server as test_server;
use ntex::http::{ use ntex::http::{
body, header, HttpService, KeepAlive, Method, Request, Response, StatusCode, body, header, HttpService, KeepAlive, Method, Request, Response, StatusCode,
}; };
use ntex::rt::time::delay_for; use ntex::rt::time::sleep;
use ntex::service::fn_service; use ntex::service::fn_service;
use ntex::web::error; use ntex::web::error;
@ -92,7 +92,7 @@ async fn test_expect_continue_h1() {
let srv = test_server(|| { let srv = test_server(|| {
HttpService::build() HttpService::build()
.expect(fn_service(|req: Request| { .expect(fn_service(|req: Request| {
delay_for(Duration::from_millis(20)).then(move |_| { sleep(Duration::from_millis(20)).then(move |_| {
if req.head().uri.query() == Some("yes=") { if req.head().uri.query() == Some("yes=") {
ok(req) ok(req)
} else { } else {