prep release

This commit is contained in:
Nikolay Kim 2021-09-07 12:58:47 +06:00
parent d8ec65e7fc
commit 0546918deb
8 changed files with 23 additions and 282 deletions

View file

@ -1,6 +1,6 @@
# Changes # Changes
## [0.4.0-b.12] - 2021-09-02 ## [0.4.0-b.12] - 2021-09-07
* Fix race in low res timer * Fix race in low res timer

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "0.4.0-b.11" version = "0.4.0-b.12"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"

View file

@ -570,8 +570,8 @@ impl Future for TimerDriver {
if Pin::as_mut(&mut inner.driver_sleep).poll(cx).is_ready() { if Pin::as_mut(&mut inner.driver_sleep).poll(cx).is_ready() {
let now = inner.driver_sleep.deadline(); let now = inner.driver_sleep.deadline();
if counter > 2 { if counter > 3 {
log::error!( log::warn!(
"Nested timer call: {:?}, elapsed: {:?} now: {:?}", "Nested timer call: {:?}, elapsed: {:?} now: {:?}",
counter, counter,
inner.elapsed_time, inner.elapsed_time,
@ -670,7 +670,7 @@ mod tests {
let elapsed = Instant::now() - time; let elapsed = Instant::now() - time;
assert!( assert!(
elapsed > Duration::from_millis(1000) elapsed > Duration::from_millis(1000)
&& elapsed < Duration::from_millis(1200) && elapsed < Duration::from_millis(1250)
); );
let time = Instant::now(); let time = Instant::now();

View file

@ -5,7 +5,6 @@ pub mod inflight;
pub mod keepalive; pub mod keepalive;
pub mod sink; pub mod sink;
pub mod stream; pub mod stream;
pub mod time;
pub mod timeout; pub mod timeout;
pub mod variant; pub mod variant;

View file

@ -1,239 +0,0 @@
use std::task::{Context, Poll};
use std::time::{self, Instant};
use std::{cell::RefCell, convert::Infallible, rc::Rc};
use crate::service::{Service, ServiceFactory};
use crate::time::{sleep, Millis};
use crate::util::Ready;
#[derive(Clone, Debug)]
pub struct LowResTime(Rc<RefCell<Inner>>);
#[derive(Debug)]
struct Inner {
resolution: Millis,
current: Option<Instant>,
}
impl Inner {
fn new(resolution: Millis) -> Self {
Inner {
resolution,
current: None,
}
}
}
impl LowResTime {
/// Create new timer service
pub fn new<T: Into<Millis>>(resolution: T) -> LowResTime {
LowResTime(Rc::new(RefCell::new(Inner::new(resolution.into()))))
}
pub fn timer(&self) -> LowResTimeService {
LowResTimeService(self.0.clone())
}
}
impl Default for LowResTime {
fn default() -> Self {
LowResTime(Rc::new(RefCell::new(Inner::new(Millis(1000)))))
}
}
impl ServiceFactory for LowResTime {
type Request = ();
type Response = Instant;
type Error = Infallible;
type InitError = Infallible;
type Config = ();
type Service = LowResTimeService;
type Future = Ready<Self::Service, Self::InitError>;
#[inline]
fn new_service(&self, _: ()) -> Self::Future {
Ready::Ok(self.timer())
}
}
#[derive(Clone, Debug)]
pub struct LowResTimeService(Rc<RefCell<Inner>>);
impl LowResTimeService {
pub fn new<T: Into<Millis>>(resolution: T) -> LowResTimeService {
LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution.into()))))
}
/// Get current time. This function has to be called from
/// future's poll method, otherwise it panics.
pub fn now(&self) -> Instant {
let cur = self.0.borrow().current;
if let Some(cur) = cur {
cur
} else {
let now = Instant::now();
let inner = self.0.clone();
let interval = {
let mut b = inner.borrow_mut();
b.current = Some(now);
b.resolution
};
crate::rt::spawn(async move {
sleep(interval).await;
inner.borrow_mut().current.take();
});
now
}
}
}
impl Service for LowResTimeService {
type Request = ();
type Response = Instant;
type Error = Infallible;
type Future = Ready<Self::Response, Self::Error>;
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
#[inline]
fn call(&self, _: ()) -> Self::Future {
Ready::Ok(self.now())
}
}
#[derive(Clone, Debug)]
pub struct SystemTime(Rc<RefCell<SystemTimeInner>>);
#[derive(Debug)]
struct SystemTimeInner {
resolution: Millis,
current: Option<time::SystemTime>,
}
impl SystemTimeInner {
fn new(resolution: Millis) -> Self {
SystemTimeInner {
resolution,
current: None,
}
}
}
#[derive(Clone, Debug)]
pub struct SystemTimeService(Rc<RefCell<SystemTimeInner>>);
impl SystemTimeService {
/// Create new system time service
pub fn new<T: Into<Millis>>(resolution: T) -> SystemTimeService {
SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(
resolution.into(),
))))
}
/// Get current time. This function has to be called from
/// future's poll method, otherwise it panics.
pub fn now(&self) -> time::SystemTime {
let cur = self.0.borrow().current;
if let Some(cur) = cur {
cur
} else {
let now = time::SystemTime::now();
let inner = self.0.clone();
let interval = {
let mut b = inner.borrow_mut();
b.current = Some(now);
b.resolution
};
crate::rt::spawn(async move {
sleep(interval).await;
inner.borrow_mut().current.take();
});
now
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{time::sleep, util::lazy};
use std::time::{Duration, SystemTime};
#[crate::rt_test]
async fn low_res_timee() {
let f = LowResTime::default();
let srv = f.new_service(()).await.unwrap();
assert!(lazy(|cx| srv.poll_ready(cx)).await.is_ready());
srv.call(()).await.unwrap();
}
/// State Under Test: Two calls of `SystemTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`.
///
/// Expected Behavior: Two back-to-back calls of `SystemTimeService::now()` return the same value.
#[crate::rt_test]
async fn system_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = SystemTimeService::new(resolution);
assert_eq!(time_service.now(), time_service.now());
}
/// State Under Test: Two calls of `LowResTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`.
///
/// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value.
#[crate::rt_test]
async fn lowres_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = LowResTimeService::new(resolution);
assert_eq!(time_service.now(), time_service.now());
}
/// State Under Test: `SystemTimeService::now()` updates returned value every resolution period.
///
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval.
#[crate::rt_test]
async fn system_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = 300;
let time_service = SystemTimeService::new(resolution);
let first_time = time_service
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
sleep(Millis(wait_time)).await;
let second_time = time_service
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
assert!(second_time - first_time >= Duration::from_millis(wait_time));
}
/// State Under Test: `LowResTimeService::now()` updates returned value every resolution period.
///
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval.
#[crate::rt_test]
async fn lowres_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = 300;
let time_service = LowResTimeService::new(resolution);
let first_time = time_service.now();
sleep(Millis(wait_time)).await;
let second_time = time_service.now();
assert!(second_time - first_time >= Duration::from_millis(wait_time));
}
}

View file

@ -62,23 +62,6 @@ macro_rules! variant_impl_and ({$fac1_type:ident, $fac2_type:ident, $name:ident,
V1: ServiceFactory, V1: ServiceFactory,
V1::Config: Clone, V1::Config: Clone,
{ {
#[doc(hidden)]
/// Convert to a Variant with more request types
pub fn and<$name, F>(self, factory: F) -> $fac2_type<V1, $($T,)+ $name>
where $name: ServiceFactory<
Config = V1::Config,
Response = V1::Response,
Error = V1::Error,
InitError = V1::InitError>,
F: IntoServiceFactory<$name>,
{
$fac2_type {
A: self.A,
$($T: self.$T,)+
$name: factory.into_factory(),
}
}
/// Convert to a Variant with more request types /// Convert to a Variant with more request types
pub fn $m_name<$name, F>(self, factory: F) -> $fac2_type<V1, $($T,)+ $name> pub fn $m_name<$name, F>(self, factory: F) -> $fac2_type<V1, $($T,)+ $name>
where $name: ServiceFactory< where $name: ServiceFactory<
@ -231,8 +214,8 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
} }
#[doc(hidden)]
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct ServiceFactoryResponse<A: ServiceFactory, $($T: ServiceFactory),+> { pub struct ServiceFactoryResponse<A: ServiceFactory, $($T: ServiceFactory),+> {
pub(super) a: Option<A::Service>, pub(super) a: Option<A::Service>,
pub(super) items: ($(Option<$T::Service>,)+), pub(super) items: ($(Option<$T::Service>,)+),
@ -372,8 +355,8 @@ mod tests {
#[crate::rt_test] #[crate::rt_test]
async fn test_variant() { async fn test_variant() {
let factory = variant(fn_factory(|| async { Ok::<_, ()>(Srv1) })) let factory = variant(fn_factory(|| async { Ok::<_, ()>(Srv1) }))
.and(fn_factory(|| async { Ok::<_, ()>(Srv2) })) .v2(fn_factory(|| async { Ok::<_, ()>(Srv2) }))
.and(fn_factory(|| async { Ok::<_, ()>(Srv2) })) .v3(fn_factory(|| async { Ok::<_, ()>(Srv2) }))
.clone(); .clone();
let service = factory.new_service(&()).await.clone().unwrap(); let service = factory.new_service(&()).await.clone().unwrap();

View file

@ -23,14 +23,14 @@ impl<E: fmt::Debug> From<ProtocolError> for StreamError<E> {
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
/// Stream ws protocol decoder. /// Stream ws protocol decoder.
pub struct StreamDecoder<S, E> { pub struct StreamDecoder<S, E> {
#[pin] #[pin]
stream: S, stream: S,
codec: Codec, codec: Codec,
buf: BytesMut, buf: BytesMut,
_t: PhantomData<E>, _t: PhantomData<E>,
} }
} }
impl<S, E> StreamDecoder<S, E> { impl<S, E> StreamDecoder<S, E> {
@ -87,13 +87,13 @@ where
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
/// Stream ws protocol decoder. /// Stream ws protocol decoder.
#[derive(Clone)] #[derive(Clone)]
pub struct StreamEncoder<S> { pub struct StreamEncoder<S> {
#[pin] #[pin]
sink: S, sink: S,
codec: Rc<RefCell<Codec>>, codec: Rc<RefCell<Codec>>,
} }
} }
impl<S> StreamEncoder<S> { impl<S> StreamEncoder<S> {

View file

@ -67,8 +67,6 @@ async fn web_ws() {
#[ntex::test] #[ntex::test]
async fn web_ws_client() { async fn web_ws_client() {
env_logger::init();
let srv = test::server(|| { let srv = test::server(|| {
App::new().service(web::resource("/").route(web::to( App::new().service(web::resource("/").route(web::to(
|req: HttpRequest, pl: web::types::Payload| async move { |req: HttpRequest, pl: web::types::Payload| async move {