Read frame timeout (#241)

This commit is contained in:
Nikolay Kim 2023-11-10 12:42:23 +06:00 committed by GitHub
parent e020bb5296
commit a32e25d72d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 598 additions and 156 deletions

View file

@ -35,3 +35,5 @@ ntex-util = { path = "ntex-util" }
ntex-glommio = { path = "ntex-glommio" } ntex-glommio = { path = "ntex-glommio" }
ntex-tokio = { path = "ntex-tokio" } ntex-tokio = { path = "ntex-tokio" }
ntex-async-std = { path = "ntex-async-std" } ntex-async-std = { path = "ntex-async-std" }
ntex-h2 = { git = "https://github.com/ntex-rs/ntex-h2.git" }

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.3.6] - 2023-11-xx
* Add DispatcherConfig type
## [0.3.5] - 2023-11-03 ## [0.3.5] - 2023-11-03
* Add Io::force_ready_ready() and Io::poll_force_ready_ready() methods * Add Io::force_ready_ready() and Io::poll_force_ready_ready() methods

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-io" name = "ntex-io"
version = "0.3.5" version = "0.3.6"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -17,9 +17,9 @@ path = "src/lib.rs"
[dependencies] [dependencies]
ntex-codec = "0.6.2" ntex-codec = "0.6.2"
ntex-bytes = "0.1.19" ntex-bytes = "0.1.20"
ntex-util = "0.3.2" ntex-util = "0.3.4"
ntex-service = "1.2.6" ntex-service = "1.2.7"
bitflags = "2.4" bitflags = "2.4"
log = "0.4" log = "0.4"

View file

@ -4,13 +4,110 @@ use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll, time}
use ntex_bytes::Pool; use ntex_bytes::Pool;
use ntex_codec::{Decoder, Encoder}; use ntex_codec::{Decoder, Encoder};
use ntex_service::{IntoService, Pipeline, Service}; use ntex_service::{IntoService, Pipeline, Service};
use ntex_util::time::Seconds; use ntex_util::time::{now, Seconds};
use ntex_util::{future::Either, ready, spawn}; use ntex_util::{future::Either, ready, spawn};
use crate::{DispatchItem, IoBoxed, IoStatusUpdate, RecvError}; use crate::{DispatchItem, IoBoxed, IoStatusUpdate, RecvError};
const ONE_SEC: time::Duration = time::Duration::from_secs(1);
type Response<U> = <U as Encoder>::Item; type Response<U> = <U as Encoder>::Item;
#[derive(Clone, Debug)]
/// Shared dispatcher configuration
pub struct DispatcherConfig(Rc<DispatcherConfigInner>);
#[derive(Debug)]
struct DispatcherConfigInner {
keepalive_timeout: Cell<Seconds>,
disconnect_timeout: Cell<Seconds>,
frame_read_rate: Cell<u16>,
frame_read_timeout: Cell<Seconds>,
frame_read_max_timeout: Cell<Seconds>,
}
impl Default for DispatcherConfig {
fn default() -> Self {
DispatcherConfig(Rc::new(DispatcherConfigInner {
keepalive_timeout: Cell::new(Seconds(30)),
disconnect_timeout: Cell::new(Seconds(1)),
frame_read_rate: Cell::new(0),
frame_read_timeout: Cell::new(Seconds::ZERO),
frame_read_max_timeout: Cell::new(Seconds::ZERO),
}))
}
}
impl DispatcherConfig {
#[inline]
/// Get keep-alive timeout
pub fn keepalive_timeout(&self) -> Seconds {
self.0.keepalive_timeout.get()
}
#[inline]
/// Get disconnect timeout
pub fn disconnect_timeout(&self) -> Seconds {
self.0.disconnect_timeout.get()
}
#[inline]
/// Get frame read rate
pub fn frame_read_rate(&self) -> Option<(Seconds, Seconds, u16)> {
let to = self.0.frame_read_timeout.get();
if to.is_zero() {
None
} else {
Some((
to,
self.0.frame_read_max_timeout.get(),
self.0.frame_read_rate.get(),
))
}
}
/// Set keep-alive timeout in seconds.
///
/// To disable timeout set value to 0.
///
/// By default keep-alive timeout is set to 30 seconds.
pub fn set_keepalive_timeout(&self, timeout: Seconds) -> &Self {
self.0.keepalive_timeout.set(timeout);
self
}
/// Set connection disconnect timeout.
///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
/// within this time, the connection get dropped.
///
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 1 seconds.
pub fn set_disconnect_timeout(&self, timeout: Seconds) -> &Self {
self.0.disconnect_timeout.set(timeout);
self
}
/// Set read rate parameters for single frame.
///
/// Set max timeout for reading single frame. If the client sends data,
/// increase the timeout by 1 second for every.
///
/// By default frame read rate is disabled.
pub fn set_frame_read_rate(
&self,
timeout: Seconds,
max_timeout: Seconds,
rate: u16,
) -> &Self {
self.0.frame_read_timeout.set(timeout);
self.0.frame_read_max_timeout.set(max_timeout);
self.0.frame_read_rate.set(rate);
self
}
}
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
/// Dispatcher - is a future that reads frames from bytes stream /// Dispatcher - is a future that reads frames from bytes stream
/// and pass then to the service. /// and pass then to the service.
@ -27,8 +124,9 @@ pin_project_lite::pin_project! {
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 { struct Flags: u8 {
const READY_ERR = 0b0001; const READY_ERR = 0b0001;
const IO_ERR = 0b0010; const IO_ERR = 0b0010;
const TIMEOUT = 0b0100;
} }
} }
@ -38,11 +136,14 @@ where
U: Encoder + Decoder, U: Encoder + Decoder,
{ {
st: Cell<DispatcherState>, st: Cell<DispatcherState>,
ka_timeout: Cell<time::Duration>,
error: Cell<Option<S::Error>>, error: Cell<Option<S::Error>>,
flags: Cell<Flags>, flags: Cell<Flags>,
shared: Rc<DispatcherShared<S, U>>, shared: Rc<DispatcherShared<S, U>>,
pool: Pool, pool: Pool,
cfg: DispatcherConfig,
read_timeout: Cell<time::Instant>,
read_max_timeout: Cell<time::Instant>,
read_bytes: Cell<u32>,
} }
pub(crate) struct DispatcherShared<S, U> pub(crate) struct DispatcherShared<S, U>
@ -92,16 +193,20 @@ where
U: Decoder + Encoder, U: Decoder + Encoder,
{ {
/// Construct new `Dispatcher` instance. /// Construct new `Dispatcher` instance.
pub fn new<Io, F>(io: Io, codec: U, service: F) -> Dispatcher<S, U> pub fn with_config<Io, F>(
io: Io,
codec: U,
service: F,
cfg: &DispatcherConfig,
) -> Dispatcher<S, U>
where where
IoBoxed: From<Io>, IoBoxed: From<Io>,
F: IntoService<S, DispatchItem<U>>, F: IntoService<S, DispatchItem<U>>,
{ {
let io = IoBoxed::from(io); let io = IoBoxed::from(io);
let ka_timeout = Cell::new(Seconds(30).into());
// register keepalive timer // register keepalive timer
io.start_keepalive_timer(ka_timeout.get()); io.start_keepalive_timer(cfg.keepalive_timeout().into());
let pool = io.memory_pool().pool(); let pool = io.memory_pool().pool();
let shared = Rc::new(DispatcherShared { let shared = Rc::new(DispatcherShared {
@ -116,13 +221,27 @@ where
inner: DispatcherInner { inner: DispatcherInner {
pool, pool,
shared, shared,
ka_timeout, cfg: cfg.clone(),
error: Cell::new(None), error: Cell::new(None),
flags: Cell::new(Flags::empty()), flags: Cell::new(Flags::empty()),
read_timeout: Cell::new(now()),
read_max_timeout: Cell::new(now()),
read_bytes: Cell::new(0),
st: Cell::new(DispatcherState::Processing), st: Cell::new(DispatcherState::Processing),
}, },
} }
} }
#[doc(hidden)]
#[deprecated(since = "0.3.6", note = "Use Dispatcher::with_config() method")]
/// Construct new `Dispatcher` instance.
pub fn new<Io, F>(io: Io, codec: U, service: F) -> Dispatcher<S, U>
where
IoBoxed: From<Io>,
F: IntoService<S, DispatchItem<U>>,
{
Self::with_config(io, codec, service, &DispatcherConfig::default())
}
} }
impl<S, U> Dispatcher<S, U> impl<S, U> Dispatcher<S, U>
@ -130,21 +249,22 @@ where
S: Service<DispatchItem<U>, Response = Option<Response<U>>>, S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
U: Decoder + Encoder, U: Decoder + Encoder,
{ {
#[doc(hidden)]
#[deprecated(since = "0.3.6", note = "Use DispatcherConfig methods")]
/// Set keep-alive timeout. /// Set keep-alive timeout.
/// ///
/// To disable timeout set value to 0. /// To disable timeout set value to 0.
/// ///
/// By default keep-alive timeout is set to 30 seconds. /// By default keep-alive timeout is set to 30 seconds.
pub fn keepalive_timeout(self, timeout: Seconds) -> Self { pub fn keepalive_timeout(self, timeout: Seconds) -> Self {
let ka_timeout = time::Duration::from(timeout);
// register keepalive timer // register keepalive timer
self.inner.shared.io.start_keepalive_timer(ka_timeout); self.inner.shared.io.start_keepalive_timer(timeout.into());
self.inner.ka_timeout.set(ka_timeout); self.inner.cfg.set_keepalive_timeout(timeout);
self self
} }
#[doc(hidden)]
#[deprecated(since = "0.3.6", note = "Use DispatcherConfig methods")]
/// Set connection disconnect timeout in seconds. /// Set connection disconnect timeout in seconds.
/// ///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
@ -154,7 +274,7 @@ where
/// ///
/// By default disconnect timeout is set to 1 seconds. /// By default disconnect timeout is set to 1 seconds.
pub fn disconnect_timeout(self, val: Seconds) -> Self { pub fn disconnect_timeout(self, val: Seconds) -> Self {
self.inner.shared.io.set_disconnect_timeout(val.into()); self.inner.shared.io.set_disconnect_timeout(val);
self self
} }
} }
@ -200,23 +320,31 @@ where
loop { loop {
match slf.st.get() { match slf.st.get() {
DispatcherState::Processing => { DispatcherState::Processing => {
let item = match ready!(slf.poll_service( let srv = ready!(slf.poll_service(&this.inner.shared.service, cx, io));
&this.inner.shared.service, let item = match srv {
cx,
io
)) {
PollService::Ready => { PollService::Ready => {
// decode incoming bytes if buffer is ready // decode incoming bytes if buffer is ready
match ready!(io.poll_recv(&slf.shared.codec, cx)) { match io.poll_recv_decode(&slf.shared.codec, cx) {
Ok(el) => { Ok(decoded) => {
slf.update_keepalive(); if let Some(el) = decoded.item {
DispatchItem::Item(el) slf.update_keepalive();
slf.remove_timeout();
DispatchItem::Item(el)
} else {
slf.update_timeout(decoded.remains);
return Poll::Pending;
}
} }
Err(RecvError::KeepAlive) => { Err(RecvError::KeepAlive) => {
log::trace!("keep-alive error, stopping dispatcher"); log::trace!("keep-alive error, stopping dispatcher");
slf.st.set(DispatcherState::Stop); slf.st.set(DispatcherState::Stop);
DispatchItem::KeepAliveTimeout DispatchItem::KeepAliveTimeout
} }
Err(RecvError::Timeout) => {
log::trace!("timeout error, stopping dispatcher");
slf.st.set(DispatcherState::Stop);
DispatchItem::ReadTimeout
}
Err(RecvError::Stop) => { Err(RecvError::Stop) => {
log::trace!("dispatcher is instructed to stop"); log::trace!("dispatcher is instructed to stop");
slf.st.set(DispatcherState::Stop); slf.st.set(DispatcherState::Stop);
@ -259,6 +387,8 @@ where
} }
// handle write back-pressure // handle write back-pressure
DispatcherState::Backpressure => { DispatcherState::Backpressure => {
slf.shared.io.stop_keepalive_timer();
let result = let result =
ready!(slf.poll_service(&this.inner.shared.service, cx, io)); ready!(slf.poll_service(&this.inner.shared.service, cx, io));
let item = match result { let item = match result {
@ -277,6 +407,7 @@ where
// call service // call service
let shared = slf.shared.clone(); let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1); shared.inflight.set(shared.inflight.get() + 1);
slf.update_keepalive();
spawn(async move { spawn(async move {
let result = shared.service.call(item).await; let result = shared.service.call(item).await;
shared.handle_result(result, &shared.io); shared.handle_result(result, &shared.io);
@ -284,7 +415,7 @@ where
} }
// drain service responses and shutdown io // drain service responses and shutdown io
DispatcherState::Stop => { DispatcherState::Stop => {
slf.unregister_keepalive(); slf.shared.io.stop_keepalive_timer();
// service may relay on poll_ready for response results // service may relay on poll_ready for response results
if !slf.flags.get().contains(Flags::READY_ERR) { if !slf.flags.get().contains(Flags::READY_ERR) {
@ -300,7 +431,8 @@ where
match ready!(slf.shared.io.poll_status_update(cx)) { match ready!(slf.shared.io.poll_status_update(cx)) {
IoStatusUpdate::PeerGone(_) IoStatusUpdate::PeerGone(_)
| IoStatusUpdate::Stop | IoStatusUpdate::Stop
| IoStatusUpdate::KeepAlive => { | IoStatusUpdate::KeepAlive
| IoStatusUpdate::Timeout => {
slf.insert_flags(Flags::IO_ERR); slf.insert_flags(Flags::IO_ERR);
continue; continue;
} }
@ -375,6 +507,11 @@ where
self.st.set(DispatcherState::Stop); self.st.set(DispatcherState::Stop);
Poll::Ready(PollService::Item(DispatchItem::KeepAliveTimeout)) Poll::Ready(PollService::Item(DispatchItem::KeepAliveTimeout))
} }
IoStatusUpdate::Timeout => {
log::trace!("read timeout, stopping dispatcher during pause");
self.st.set(DispatcherState::Stop);
Poll::Ready(PollService::Item(DispatchItem::ReadTimeout))
}
IoStatusUpdate::Stop => { IoStatusUpdate::Stop => {
log::trace!("dispatcher is instructed to stop during pause"); log::trace!("dispatcher is instructed to stop during pause");
self.st.set(DispatcherState::Stop); self.st.set(DispatcherState::Stop);
@ -410,13 +547,59 @@ where
/// update keep-alive timer /// update keep-alive timer
fn update_keepalive(&self) { fn update_keepalive(&self) {
self.shared.io.start_keepalive_timer(self.ka_timeout.get()); self.shared
.io
.start_keepalive_timer(self.cfg.keepalive_timeout().into());
} }
/// unregister keep-alive timer fn remove_timeout(&self) {
fn unregister_keepalive(&self) { let mut flags = self.flags.get();
self.shared.io.stop_keepalive_timer();
self.ka_timeout.set(time::Duration::ZERO); if self.flags.get().contains(Flags::TIMEOUT) {
flags.remove(Flags::TIMEOUT);
self.flags.set(flags);
self.shared.io.stop_timer(self.read_timeout.get());
}
}
fn update_timeout(&self, remains: usize) {
if let Some((period, max, rate)) = self.cfg.frame_read_rate() {
let bytes = remains as u32;
let mut flags = self.flags.get();
if flags.contains(Flags::TIMEOUT) {
// update existing timeout
let delta = (bytes - self.read_bytes.get())
.try_into()
.unwrap_or(u16::MAX);
if delta >= rate {
let n = now();
let to = self.read_timeout.get();
let next = to + ONE_SEC;
let new_timeout = if n >= next { ONE_SEC } else { next - n };
// max timeout
if max.is_zero() || (n + new_timeout) <= self.read_max_timeout.get() {
self.shared.io.stop_timer(to);
self.read_bytes.set(bytes);
self.read_timeout
.set(self.shared.io.start_timer(new_timeout));
}
}
} else if remains != 0 {
// we got new data but not enough to parse single frame
flags.insert(Flags::TIMEOUT);
self.flags.set(flags);
self.read_bytes.set(bytes);
self.read_timeout
.set(self.shared.io.start_timer(period.into()));
if !max.is_zero() {
self.read_max_timeout.set(now() + time::Duration::from(max));
}
}
}
} }
} }
@ -424,9 +607,9 @@ where
mod tests { mod tests {
use rand::Rng; use rand::Rng;
use std::sync::{atomic::AtomicBool, atomic::Ordering::Relaxed, Arc, Mutex}; use std::sync::{atomic::AtomicBool, atomic::Ordering::Relaxed, Arc, Mutex};
use std::{cell::RefCell, time::Duration}; use std::{cell::RefCell, io, time::Duration};
use ntex_bytes::{Bytes, PoolId, PoolRef}; use ntex_bytes::{Bytes, BytesMut, PoolId, PoolRef};
use ntex_codec::BytesCodec; use ntex_codec::BytesCodec;
use ntex_service::ServiceCtx; use ntex_service::ServiceCtx;
use ntex_util::{future::Ready, time::sleep, time::Millis, time::Seconds}; use ntex_util::{future::Ready, time::sleep, time::Millis, time::Seconds};
@ -455,6 +638,32 @@ mod tests {
} }
} }
#[derive(Copy, Clone)]
struct BCodec(usize);
impl Encoder for BCodec {
type Item = Bytes;
type Error = io::Error;
fn encode(&self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.extend_from_slice(&item[..]);
Ok(())
}
}
impl Decoder for BCodec {
type Item = BytesMut;
type Error = io::Error;
fn decode(&self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < self.0 {
Ok(None)
} else {
Ok(Some(src.split_to(self.0)))
}
}
}
impl<S, U> Dispatcher<S, U> impl<S, U> Dispatcher<S, U>
where where
S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static, S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static,
@ -468,7 +677,9 @@ mod tests {
) -> (Self, State) { ) -> (Self, State) {
let state = Io::new(io); let state = Io::new(io);
let pool = state.memory_pool().pool(); let pool = state.memory_pool().pool();
let ka_timeout = Cell::new(Seconds(1).into()); let cfg = DispatcherConfig::default()
.set_keepalive_timeout(Seconds(1))
.clone();
let inner = State(state.get_ref()); let inner = State(state.get_ref());
state.start_keepalive_timer(Duration::from_millis(500)); state.start_keepalive_timer(Duration::from_millis(500));
@ -487,9 +698,12 @@ mod tests {
error: Cell::new(None), error: Cell::new(None),
flags: Cell::new(super::Flags::empty()), flags: Cell::new(super::Flags::empty()),
st: Cell::new(DispatcherState::Processing), st: Cell::new(DispatcherState::Processing),
read_timeout: Cell::new(time::Instant::now()),
read_max_timeout: Cell::new(time::Instant::now()),
read_bytes: Cell::new(0),
pool, pool,
shared, shared,
ka_timeout, cfg,
}, },
}, },
inner, inner,
@ -548,6 +762,7 @@ mod tests {
} }
}), }),
); );
#[allow(deprecated)]
spawn(async move { spawn(async move {
let _ = disp.disconnect_timeout(Seconds(1)).await; let _ = disp.disconnect_timeout(Seconds(1)).await;
}); });
@ -747,7 +962,7 @@ mod tests {
}), }),
), ),
); );
let disp = disp.keepalive_timeout(Seconds::ZERO); disp.inner.cfg.set_keepalive_timeout(Seconds::ZERO);
let pool = PoolId::P10.pool_ref(); let pool = PoolId::P10.pool_ref();
pool.set_read_params(1024, 512); pool.set_read_params(1024, 512);
state.set_memory_pool(pool); state.set_memory_pool(pool);
@ -802,13 +1017,14 @@ mod tests {
} }
}), }),
); );
#[allow(deprecated)]
spawn(async move { spawn(async move {
let _ = disp let _ = disp
.keepalive_timeout(Seconds::ZERO) .keepalive_timeout(Seconds::ZERO)
.keepalive_timeout(Seconds(1)) .keepalive_timeout(Seconds(1))
.await; .await;
}); });
state.0 .0.disconnect_timeout.set(Millis::ONE_SEC); state.0 .0.disconnect_timeout.set(Seconds(1));
let buf = client.read().await.unwrap(); let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n")); assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
@ -821,6 +1037,61 @@ mod tests {
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]); assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]);
} }
#[ntex::test]
async fn test_read_timeout() {
let (client, server) = IoTest::create();
client.remote_buffer_cap(1024);
let data = Arc::new(Mutex::new(RefCell::new(Vec::new())));
let data2 = data.clone();
let (disp, state) = Dispatcher::debug(
server,
BCodec(8),
ntex_service::fn_service(move |msg: DispatchItem<BCodec>| {
let data = data2.clone();
async move {
match msg {
DispatchItem::Item(bytes) => {
data.lock().unwrap().borrow_mut().push(0);
return Ok::<_, ()>(Some(bytes.freeze()));
}
DispatchItem::ReadTimeout => {
data.lock().unwrap().borrow_mut().push(1);
}
_ => (),
}
Ok(None)
}
}),
);
spawn(async move {
disp.inner
.cfg
.set_keepalive_timeout(Seconds::ZERO)
.set_frame_read_rate(Seconds(1), Seconds(2), 2);
let _ = disp.await;
});
client.write("12345678");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"12345678"));
client.write("1");
sleep(Millis(500)).await;
assert!(!state.flags().contains(Flags::IO_STOPPING));
client.write("23");
sleep(Millis(500)).await;
assert!(!state.flags().contains(Flags::IO_STOPPING));
client.write("4");
sleep(Millis(1100)).await;
// write side must be closed, dispatcher should fail with keep-alive
assert!(state.flags().contains(Flags::IO_STOPPING));
assert!(client.is_closed());
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]);
}
#[ntex::test] #[ntex::test]
async fn test_unhandled_data() { async fn test_unhandled_data() {
let handled = Arc::new(AtomicBool::new(false)); let handled = Arc::new(AtomicBool::new(false));

View file

@ -97,14 +97,18 @@ impl Filter for Base {
if flags.contains(Flags::IO_STOPPED) { if flags.contains(Flags::IO_STOPPED) {
Poll::Ready(WriteStatus::Terminate) Poll::Ready(WriteStatus::Terminate)
} else if flags.intersects(Flags::IO_STOPPING) { } else if flags.intersects(Flags::IO_STOPPING) {
Poll::Ready(WriteStatus::Shutdown(self.0 .0.disconnect_timeout.get())) Poll::Ready(WriteStatus::Shutdown(
self.0 .0.disconnect_timeout.get().into(),
))
} else if flags.contains(Flags::IO_STOPPING_FILTERS) } else if flags.contains(Flags::IO_STOPPING_FILTERS)
&& !flags.contains(Flags::IO_FILTERS_TIMEOUT) && !flags.contains(Flags::IO_FILTERS_TIMEOUT)
{ {
flags.insert(Flags::IO_FILTERS_TIMEOUT); flags.insert(Flags::IO_FILTERS_TIMEOUT);
self.0.set_flags(flags); self.0.set_flags(flags);
self.0 .0.write_task.register(cx.waker()); self.0 .0.write_task.register(cx.waker());
Poll::Ready(WriteStatus::Timeout(self.0 .0.disconnect_timeout.get())) Poll::Ready(WriteStatus::Timeout(
self.0 .0.disconnect_timeout.get().into(),
))
} else { } else {
self.0 .0.write_task.register(cx.waker()); self.0 .0.write_task.register(cx.waker());
Poll::Ready(WriteStatus::Ready) Poll::Ready(WriteStatus::Ready)

View file

@ -4,14 +4,14 @@ use std::{fmt, future::Future, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc
use ntex_bytes::{PoolId, PoolRef}; use ntex_bytes::{PoolId, PoolRef};
use ntex_codec::{Decoder, Encoder}; use ntex_codec::{Decoder, Encoder};
use ntex_util::time::{now, Millis}; use ntex_util::time::{now, Seconds};
use ntex_util::{future::poll_fn, future::Either, task::LocalWaker}; use ntex_util::{future::poll_fn, future::Either, task::LocalWaker};
use crate::buf::Stack; use crate::buf::Stack;
use crate::filter::{Base, Filter, Layer, NullFilter}; use crate::filter::{Base, Filter, Layer, NullFilter};
use crate::seal::Sealed; use crate::seal::Sealed;
use crate::tasks::{ReadContext, WriteContext}; use crate::tasks::{ReadContext, WriteContext};
use crate::{FilterLayer, Handle, IoStatusUpdate, IoStream, RecvError}; use crate::{Decoded, FilterLayer, Handle, IoStatusUpdate, IoStream, RecvError};
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
@ -45,9 +45,11 @@ bitflags::bitflags! {
const DSP_STOP = 0b0001_0000_0000_0000; const DSP_STOP = 0b0001_0000_0000_0000;
/// keep-alive timeout occured /// keep-alive timeout occured
const DSP_KEEPALIVE = 0b0010_0000_0000_0000; const DSP_KEEPALIVE = 0b0010_0000_0000_0000;
/// custom timeout occured
const DSP_TIMEOUT = 0b0100_0000_0000_0000;
/// keep-alive timeout started /// keep-alive timeout started
const KEEPALIVE = 0b0100_0000_0000_0000; const KEEPALIVE = 0b1000_0000_0000_0000;
} }
} }
@ -60,7 +62,7 @@ pub struct IoRef(pub(super) Rc<IoState>);
pub(crate) struct IoState { pub(crate) struct IoState {
pub(super) flags: Cell<Flags>, pub(super) flags: Cell<Flags>,
pub(super) pool: Cell<PoolRef>, pub(super) pool: Cell<PoolRef>,
pub(super) disconnect_timeout: Cell<Millis>, pub(super) disconnect_timeout: Cell<Seconds>,
pub(super) error: Cell<Option<io::Error>>, pub(super) error: Cell<Option<io::Error>>,
pub(super) read_task: LocalWaker, pub(super) read_task: LocalWaker,
pub(super) write_task: LocalWaker, pub(super) write_task: LocalWaker,
@ -91,13 +93,18 @@ impl IoState {
} }
} }
pub(super) fn notify_keepalive(&self) { pub(super) fn notify_timeout(&self, custom: bool) {
log::trace!("keep-alive timeout, notify dispatcher");
let mut flags = self.flags.get(); let mut flags = self.flags.get();
flags.remove(Flags::KEEPALIVE); if custom {
if !flags.contains(Flags::DSP_KEEPALIVE) { flags.insert(Flags::DSP_TIMEOUT);
flags.insert(Flags::DSP_KEEPALIVE);
self.dispatch_task.wake(); self.dispatch_task.wake();
} else {
log::trace!("keep-alive timeout, notify dispatcher");
flags.remove(Flags::KEEPALIVE);
if !flags.contains(Flags::DSP_KEEPALIVE) {
flags.insert(Flags::DSP_KEEPALIVE);
self.dispatch_task.wake();
}
} }
self.flags.set(flags); self.flags.set(flags);
} }
@ -192,7 +199,7 @@ impl Io {
pool: Cell::new(pool), pool: Cell::new(pool),
flags: Cell::new(Flags::empty()), flags: Cell::new(Flags::empty()),
error: Cell::new(None), error: Cell::new(None),
disconnect_timeout: Cell::new(Millis::ONE_SEC), disconnect_timeout: Cell::new(Seconds(1)),
dispatch_task: LocalWaker::new(), dispatch_task: LocalWaker::new(),
read_task: LocalWaker::new(), read_task: LocalWaker::new(),
write_task: LocalWaker::new(), write_task: LocalWaker::new(),
@ -230,7 +237,7 @@ impl<F> Io<F> {
#[inline] #[inline]
/// Set io disconnect timeout in millis /// Set io disconnect timeout in millis
pub fn set_disconnect_timeout(&self, timeout: Millis) { pub fn set_disconnect_timeout(&self, timeout: Seconds) {
self.0 .0.disconnect_timeout.set(timeout); self.0 .0.disconnect_timeout.set(timeout);
} }
@ -248,7 +255,7 @@ impl<F> Io<F> {
| Flags::IO_STOPPING_FILTERS, | Flags::IO_STOPPING_FILTERS,
), ),
error: Cell::new(None), error: Cell::new(None),
disconnect_timeout: Cell::new(Millis::ONE_SEC), disconnect_timeout: Cell::new(Seconds(1)),
dispatch_task: LocalWaker::new(), dispatch_task: LocalWaker::new(),
read_task: LocalWaker::new(), read_task: LocalWaker::new(),
write_task: LocalWaker::new(), write_task: LocalWaker::new(),
@ -345,10 +352,9 @@ impl<F> Io<F> {
loop { loop {
return match poll_fn(|cx| self.poll_recv(codec, cx)).await { return match poll_fn(|cx| self.poll_recv(codec, cx)).await {
Ok(item) => Ok(Some(item)), Ok(item) => Ok(Some(item)),
Err(RecvError::KeepAlive) => Err(Either::Right(io::Error::new( Err(RecvError::KeepAlive) | Err(RecvError::Timeout) => Err(Either::Right(
io::ErrorKind::Other, io::Error::new(io::ErrorKind::Other, "Keep-alive"),
"Keep-alive", )),
))),
Err(RecvError::Stop) => Err(Either::Right(io::Error::new( Err(RecvError::Stop) => Err(Either::Right(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
"Dispatcher stopped", "Dispatcher stopped",
@ -515,36 +521,60 @@ impl<F> Io<F> {
where where
U: Decoder, U: Decoder,
{ {
match self.decode(codec) { let decoded = self.poll_recv_decode(codec, cx)?;
Ok(Some(el)) => Poll::Ready(Ok(el)),
Ok(None) => { if let Some(item) = decoded.item {
let flags = self.flags(); Poll::Ready(Ok(item))
if flags.contains(Flags::IO_STOPPED) { } else {
Poll::Ready(Err(RecvError::PeerGone(self.error()))) Poll::Pending
} else if flags.contains(Flags::DSP_STOP) { }
self.0 .0.remove_flags(Flags::DSP_STOP); }
Poll::Ready(Err(RecvError::Stop))
} else if flags.contains(Flags::DSP_KEEPALIVE) { #[doc(hidden)]
self.0 .0.remove_flags(Flags::DSP_KEEPALIVE); #[inline]
Poll::Ready(Err(RecvError::KeepAlive)) /// Decode codec item from incoming bytes stream.
} else if flags.contains(Flags::WR_BACKPRESSURE) { ///
Poll::Ready(Err(RecvError::WriteBackpressure)) /// Wake read task and request to read more data if data is not enough for decoding.
} else { /// If error get returned this method does not register waker for later wake up action.
match self.poll_read_ready(cx) { pub fn poll_recv_decode<U>(
Poll::Pending | Poll::Ready(Ok(Some(()))) => { &self,
log::trace!("not enough data to decode next frame"); codec: &U,
Poll::Pending cx: &mut Context<'_>,
} ) -> Result<Decoded<U::Item>, RecvError<U>>
Poll::Ready(Err(e)) => { where
Poll::Ready(Err(RecvError::PeerGone(Some(e)))) U: Decoder,
} {
Poll::Ready(Ok(None)) => { let decoded = self
Poll::Ready(Err(RecvError::PeerGone(None))) .decode_item(codec)
} .map_err(|err| RecvError::Decoder(err))?;
if decoded.item.is_some() {
Ok(decoded)
} else {
let flags = self.flags();
if flags.contains(Flags::IO_STOPPED) {
Err(RecvError::PeerGone(self.error()))
} else if flags.contains(Flags::DSP_STOP) {
self.0 .0.remove_flags(Flags::DSP_STOP);
Err(RecvError::Stop)
} else if flags.contains(Flags::DSP_KEEPALIVE) {
self.0 .0.remove_flags(Flags::DSP_KEEPALIVE);
Err(RecvError::KeepAlive)
} else if flags.contains(Flags::DSP_TIMEOUT) {
self.0 .0.remove_flags(Flags::DSP_TIMEOUT);
Err(RecvError::Timeout)
} else if flags.contains(Flags::WR_BACKPRESSURE) {
Err(RecvError::WriteBackpressure)
} else {
match self.poll_read_ready(cx) {
Poll::Pending | Poll::Ready(Ok(Some(()))) => {
log::trace!("not enough data to decode next frame");
Ok(decoded)
} }
Poll::Ready(Err(e)) => Err(RecvError::PeerGone(Some(e))),
Poll::Ready(Ok(None)) => Err(RecvError::PeerGone(None)),
} }
} }
Err(err) => Poll::Ready(Err(RecvError::Decoder(err))),
} }
} }
@ -626,6 +656,9 @@ impl<F> Io<F> {
} else if flags.contains(Flags::DSP_KEEPALIVE) { } else if flags.contains(Flags::DSP_KEEPALIVE) {
self.0 .0.remove_flags(Flags::DSP_KEEPALIVE); self.0 .0.remove_flags(Flags::DSP_KEEPALIVE);
Poll::Ready(IoStatusUpdate::KeepAlive) Poll::Ready(IoStatusUpdate::KeepAlive)
} else if flags.contains(Flags::DSP_TIMEOUT) {
self.0 .0.remove_flags(Flags::DSP_TIMEOUT);
Poll::Ready(IoStatusUpdate::Timeout)
} else if flags.contains(Flags::WR_BACKPRESSURE) { } else if flags.contains(Flags::WR_BACKPRESSURE) {
Poll::Ready(IoStatusUpdate::WriteBackpressure) Poll::Ready(IoStatusUpdate::WriteBackpressure)
} else { } else {
@ -916,7 +949,7 @@ mod tests {
let server = Io::new(server); let server = Io::new(server);
assert!(server.eq(&server)); assert!(server.eq(&server));
server.0 .0.notify_keepalive(); server.0 .0.notify_timeout(false);
let err = server.recv(&BytesCodec).await.err().unwrap(); let err = server.recv(&BytesCodec).await.err().unwrap();
assert!(format!("{:?}", err).contains("Keep-alive")); assert!(format!("{:?}", err).contains("Keep-alive"));

View file

@ -3,7 +3,7 @@ use std::{any, fmt, hash, io, time};
use ntex_bytes::{BytesVec, PoolRef}; use ntex_bytes::{BytesVec, PoolRef};
use ntex_codec::{Decoder, Encoder}; use ntex_codec::{Decoder, Encoder};
use super::{io::Flags, timer, types, Filter, IoRef, OnDisconnect, WriteBuf}; use super::{io::Flags, timer, types, Decoded, Filter, IoRef, OnDisconnect, WriteBuf};
impl IoRef { impl IoRef {
#[inline] #[inline]
@ -137,6 +137,25 @@ impl IoRef {
.with_read_destination(self, |buf| codec.decode_vec(buf)) .with_read_destination(self, |buf| codec.decode_vec(buf))
} }
#[inline]
/// Attempts to decode a frame from the read buffer
pub fn decode_item<U>(
&self,
codec: &U,
) -> Result<Decoded<<U as Decoder>::Item>, <U as Decoder>::Error>
where
U: Decoder,
{
self.0.buffer.with_read_destination(self, |buf| {
let len = buf.len();
codec.decode_vec(buf).map(|item| Decoded {
item,
remains: buf.len(),
consumed: len - buf.len(),
})
})
}
#[inline] #[inline]
/// Write bytes to a buffer and wake up write task /// Write bytes to a buffer and wake up write task
pub fn write(&self, src: &[u8]) -> io::Result<()> { pub fn write(&self, src: &[u8]) -> io::Result<()> {
@ -190,12 +209,12 @@ impl IoRef {
/// Start keep-alive timer /// Start keep-alive timer
pub fn start_keepalive_timer(&self, timeout: time::Duration) { pub fn start_keepalive_timer(&self, timeout: time::Duration) {
if self.flags().contains(Flags::KEEPALIVE) { if self.flags().contains(Flags::KEEPALIVE) {
timer::unregister(self.0.keepalive.get(), self); timer::unregister(self.0.keepalive.get(), self, false);
} }
if !timeout.is_zero() { if !timeout.is_zero() {
log::debug!("start keep-alive timeout {:?}", timeout); log::debug!("start keep-alive timeout {:?}", timeout);
self.0.insert_flags(Flags::KEEPALIVE); self.0.insert_flags(Flags::KEEPALIVE);
self.0.keepalive.set(timer::register(timeout, self)); self.0.keepalive.set(timer::register(timeout, self, false));
} else { } else {
self.0.remove_flags(Flags::KEEPALIVE); self.0.remove_flags(Flags::KEEPALIVE);
} }
@ -206,10 +225,24 @@ impl IoRef {
pub fn stop_keepalive_timer(&self) { pub fn stop_keepalive_timer(&self) {
if self.flags().contains(Flags::KEEPALIVE) { if self.flags().contains(Flags::KEEPALIVE) {
log::debug!("unregister keep-alive timeout"); log::debug!("unregister keep-alive timeout");
timer::unregister(self.0.keepalive.get(), self) timer::unregister(self.0.keepalive.get(), self, false)
} }
} }
#[inline]
/// Start custom timer
pub fn start_timer(&self, timeout: time::Duration) -> time::Instant {
log::debug!("start custom timeout: {:?}", timeout);
timer::register(timeout, self, true)
}
#[inline]
/// Stop custom timer
pub fn stop_timer(&self, id: time::Instant) {
log::debug!("unregister custom timeout");
timer::unregister(id, self, true)
}
#[inline] #[inline]
/// Notify when io stream get disconnected /// Notify when io stream get disconnected
pub fn on_disconnect(&self) -> OnDisconnect { pub fn on_disconnect(&self) -> OnDisconnect {
@ -305,6 +338,11 @@ mod tests {
let buf = client.read().await.unwrap(); let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"test")); assert_eq!(buf, Bytes::from_static(b"test"));
client.write(b"test");
state.read_ready().await.unwrap();
let buf = state.decode(&BytesCodec).unwrap().unwrap();
assert_eq!(buf, Bytes::from_static(b"test"));
client.write_error(io::Error::new(io::ErrorKind::Other, "err")); client.write_error(io::Error::new(io::ErrorKind::Other, "err"));
let res = state.send(Bytes::from_static(b"test"), &BytesCodec).await; let res = state.send(Bytes::from_static(b"test"), &BytesCodec).await;
assert!(res.is_err()); assert!(res.is_err());
@ -409,13 +447,15 @@ mod tests {
let write_order = Rc::new(RefCell::new(Vec::new())); let write_order = Rc::new(RefCell::new(Vec::new()));
let (client, server) = IoTest::create(); let (client, server) = IoTest::create();
let io = Io::new(server).add_filter(Counter { let counter = Counter {
idx: 1, idx: 1,
in_bytes: in_bytes.clone(), in_bytes: in_bytes.clone(),
out_bytes: out_bytes.clone(), out_bytes: out_bytes.clone(),
read_order: read_order.clone(), read_order: read_order.clone(),
write_order: write_order.clone(), write_order: write_order.clone(),
}); };
format!("{:?}", counter);
let io = Io::new(server).add_filter(counter);
client.remote_buffer_cap(1024); client.remote_buffer_cap(1024);
client.write(TEXT); client.write(TEXT);

View file

@ -24,13 +24,13 @@ use ntex_codec::{Decoder, Encoder};
use ntex_util::time::Millis; use ntex_util::time::Millis;
pub use self::buf::{ReadBuf, WriteBuf}; pub use self::buf::{ReadBuf, WriteBuf};
pub use self::dispatcher::Dispatcher; pub use self::dispatcher::{Dispatcher, DispatcherConfig};
pub use self::filter::{Base, Filter, Layer}; pub use self::filter::{Base, Filter, Layer};
pub use self::framed::Framed; pub use self::framed::Framed;
pub use self::io::{Io, IoRef, OnDisconnect}; pub use self::io::{Io, IoRef, OnDisconnect};
pub use self::seal::{IoBoxed, Sealed}; pub use self::seal::{IoBoxed, Sealed};
pub use self::tasks::{ReadContext, WriteContext}; pub use self::tasks::{ReadContext, WriteContext};
pub use self::utils::{filter, seal}; pub use self::utils::{filter, seal, Decoded};
/// Status for read task /// Status for read task
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
@ -117,6 +117,8 @@ pub trait Handle {
pub enum IoStatusUpdate { pub enum IoStatusUpdate {
/// Keep-alive timeout occured /// Keep-alive timeout occured
KeepAlive, KeepAlive,
/// Custom timeout occured
Timeout,
/// Write backpressure is enabled /// Write backpressure is enabled
WriteBackpressure, WriteBackpressure,
/// Stop io stream handling /// Stop io stream handling
@ -130,6 +132,8 @@ pub enum IoStatusUpdate {
pub enum RecvError<U: Decoder> { pub enum RecvError<U: Decoder> {
/// Keep-alive timeout occured /// Keep-alive timeout occured
KeepAlive, KeepAlive,
/// Custom timeout occured
Timeout,
/// Write backpressure is enabled /// Write backpressure is enabled
WriteBackpressure, WriteBackpressure,
/// Stop io stream handling /// Stop io stream handling
@ -149,6 +153,8 @@ pub enum DispatchItem<U: Encoder + Decoder> {
WBackPressureDisabled, WBackPressureDisabled,
/// Keep alive timeout /// Keep alive timeout
KeepAliveTimeout, KeepAliveTimeout,
/// Frame read timeout
ReadTimeout,
/// Decoder parse error /// Decoder parse error
DecoderError(<U as Decoder>::Error), DecoderError(<U as Decoder>::Error),
/// Encoder parse error /// Encoder parse error
@ -176,6 +182,9 @@ where
DispatchItem::KeepAliveTimeout => { DispatchItem::KeepAliveTimeout => {
write!(fmt, "DispatchItem::KeepAliveTimeout") write!(fmt, "DispatchItem::KeepAliveTimeout")
} }
DispatchItem::ReadTimeout => {
write!(fmt, "DispatchItem::ReadTimeout")
}
DispatchItem::EncoderError(ref e) => { DispatchItem::EncoderError(ref e) => {
write!(fmt, "DispatchItem::EncoderError({:?})", e) write!(fmt, "DispatchItem::EncoderError({:?})", e)
} }
@ -213,5 +222,6 @@ mod tests {
assert!( assert!(
format!("{:?}", T::KeepAliveTimeout).contains("DispatchItem::KeepAliveTimeout") format!("{:?}", T::KeepAliveTimeout).contains("DispatchItem::KeepAliveTimeout")
); );
assert!(format!("{:?}", T::ReadTimeout).contains("DispatchItem::ReadTimeout"));
} }
} }

View file

@ -1,45 +1,76 @@
use std::{cell::RefCell, collections::BTreeMap, rc::Rc, time::Duration, time::Instant}; #![allow(clippy::mutable_key_type)]
use std::collections::{BTreeMap, VecDeque};
use std::{cell::RefCell, rc::Rc, time::Duration, time::Instant};
use ntex_util::time::{now, sleep, Millis}; use ntex_util::time::{now, sleep, Millis};
use ntex_util::{spawn, HashSet}; use ntex_util::{spawn, HashSet};
use crate::{io::IoState, IoRef}; use crate::{io::IoState, IoRef};
const CAP: usize = 64;
const SEC: Duration = Duration::from_secs(1);
thread_local! { thread_local! {
static TIMER: Rc<RefCell<Inner>> = Rc::new(RefCell::new( static TIMER: Rc<RefCell<Inner>> = Rc::new(RefCell::new(
Inner { Inner {
running: false, running: false,
cache: VecDeque::with_capacity(CAP),
notifications: BTreeMap::default(), notifications: BTreeMap::default(),
})); }));
} }
type Notifications = BTreeMap<Instant, (HashSet<Rc<IoState>>, HashSet<Rc<IoState>>)>;
struct Inner { struct Inner {
running: bool, running: bool,
notifications: BTreeMap<Instant, HashSet<Rc<IoState>>>, cache: VecDeque<HashSet<Rc<IoState>>>,
notifications: Notifications,
} }
impl Inner { impl Inner {
fn unregister(&mut self, expire: Instant, io: &IoRef) { fn unregister(&mut self, expire: Instant, io: &IoRef, custom: bool) {
if let Some(states) = self.notifications.get_mut(&expire) { if let Some(states) = self.notifications.get_mut(&expire) {
states.remove(&io.0); if custom {
if states.is_empty() { states.1.remove(&io.0);
self.notifications.remove(&expire); } else {
states.0.remove(&io.0);
}
if states.0.is_empty() && states.1.is_empty() {
if let Some(items) = self.notifications.remove(&expire) {
if self.cache.len() <= CAP {
self.cache.push_back(items.0);
self.cache.push_back(items.1);
}
}
} }
} }
} }
} }
pub(crate) fn register(timeout: Duration, io: &IoRef) -> Instant { pub(crate) fn register(timeout: Duration, io: &IoRef, custom: bool) -> Instant {
let expire = now() + timeout;
TIMER.with(|timer| { TIMER.with(|timer| {
let mut inner = timer.borrow_mut(); let mut inner = timer.borrow_mut();
inner let expire = now() + timeout;
.notifications
.entry(expire) // search existing key
.or_default() let expire = if let Some((expire, _)) =
.insert(io.0.clone()); inner.notifications.range(expire..expire + SEC).next()
{
*expire
} else {
let n0 = inner.cache.pop_front().unwrap_or_default();
let n1 = inner.cache.pop_front().unwrap_or_default();
inner.notifications.insert(expire, (n0, n1));
expire
};
let notifications = inner.notifications.get_mut(&expire).unwrap();
if custom {
notifications.1.insert(io.0.clone());
} else {
notifications.0.insert(io.0.clone());
};
if !inner.running { if !inner.running {
inner.running = true; inner.running = true;
@ -57,8 +88,12 @@ pub(crate) fn register(timeout: Duration, io: &IoRef) -> Instant {
while let Some(key) = i.notifications.keys().next() { while let Some(key) = i.notifications.keys().next() {
let key = *key; let key = *key;
if key <= now_time { if key <= now_time {
for st in i.notifications.remove(&key).unwrap() { let mut items = i.notifications.remove(&key).unwrap();
st.notify_keepalive(); items.0.drain().for_each(|st| st.notify_timeout(false));
items.1.drain().for_each(|st| st.notify_timeout(true));
if i.cache.len() <= CAP {
i.cache.push_back(items.0);
i.cache.push_back(items.1);
} }
} else { } else {
break; break;
@ -75,21 +110,23 @@ pub(crate) fn register(timeout: Duration, io: &IoRef) -> Instant {
drop(guard); drop(guard);
}); });
} }
});
expire expire
})
} }
struct TimerGuard(Rc<RefCell<Inner>>); struct TimerGuard(Rc<RefCell<Inner>>);
impl Drop for TimerGuard { impl Drop for TimerGuard {
fn drop(&mut self) { fn drop(&mut self) {
self.0.borrow_mut().running = false; let mut inner = self.0.borrow_mut();
inner.running = false;
inner.notifications.clear();
} }
} }
pub(crate) fn unregister(expire: Instant, io: &IoRef) { pub(crate) fn unregister(expire: Instant, io: &IoRef, custom: bool) {
TIMER.with(|timer| { TIMER.with(|timer| {
timer.borrow_mut().unregister(expire, io); timer.borrow_mut().unregister(expire, io, custom);
}) })
} }

View file

@ -5,6 +5,15 @@ use ntex_util::future::Ready;
use crate::{Filter, FilterFactory, Io, IoBoxed, Layer}; use crate::{Filter, FilterFactory, Io, IoBoxed, Layer};
/// Decoded item from buffer
#[doc(hidden)]
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Decoded<T> {
pub item: Option<T>,
pub remains: usize,
pub consumed: usize,
}
/// Service that converts any Io<F> stream to IoBoxed stream /// Service that converts any Io<F> stream to IoBoxed stream
pub fn seal<F, S, C>( pub fn seal<F, S, C>(
srv: S, srv: S,
@ -176,6 +185,14 @@ mod tests {
.unwrap(); .unwrap();
let _ = svc.call(Io::new(server)).await; let _ = svc.call(Io::new(server)).await;
let (client, _) = IoTest::create();
let io = Io::new(client);
format!("{:?}", TestFilter);
let mut s = Stack::new();
s.add_layer();
let _ = s.read_buf(&io, 0, 0, |b| TestFilter.process_read_buf(b));
let _ = s.write_buf(&io, 0, |b| TestFilter.process_write_buf(b));
} }
#[ntex::test] #[ntex::test]

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.7.9] - 2023-11-10
* Update ntex io
## [0.7.8] - 2023-11-06 ## [0.7.8] - 2023-11-06
* Stopping Server does not release resources #233 * Stopping Server does not release resources #233

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "0.7.8" version = "0.7.9"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"
@ -52,13 +52,13 @@ ntex-codec = "0.6.2"
ntex-connect = "0.3.2" ntex-connect = "0.3.2"
ntex-http = "0.1.10" ntex-http = "0.1.10"
ntex-router = "0.5.2" ntex-router = "0.5.2"
ntex-service = "1.2.6" ntex-service = "1.2.7"
ntex-macros = "0.1.3" ntex-macros = "0.1.3"
ntex-util = "0.3.3" ntex-util = "0.3.4"
ntex-bytes = "0.1.19" ntex-bytes = "0.1.20"
ntex-h2 = "0.4.3" ntex-h2 = "0.4.4"
ntex-rt = "0.4.10" ntex-rt = "0.4.10"
ntex-io = "0.3.5" ntex-io = "0.3.6"
ntex-tls = "0.3.2" ntex-tls = "0.3.2"
ntex-tokio = { version = "0.3.0", optional = true } ntex-tokio = { version = "0.3.0", optional = true }
ntex-glommio = { version = "0.3.0", optional = true } ntex-glommio = { version = "0.3.0", optional = true }

View file

@ -36,7 +36,7 @@ pub struct Connector {
timeout: Millis, timeout: Millis,
conn_lifetime: Duration, conn_lifetime: Duration,
conn_keep_alive: Duration, conn_keep_alive: Duration,
disconnect_timeout: Millis, disconnect_timeout: Seconds,
limit: usize, limit: usize,
h2config: h2::Config, h2config: h2::Config,
connector: BoxedConnector, connector: BoxedConnector,
@ -62,7 +62,7 @@ impl Connector {
timeout: Millis(1_000), timeout: Millis(1_000),
conn_lifetime: Duration::from_secs(75), conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15), conn_keep_alive: Duration::from_secs(15),
disconnect_timeout: Millis(3_000), disconnect_timeout: Seconds(3),
limit: 100, limit: 100,
h2config: h2::Config::client(), h2config: h2::Config::client(),
}; };
@ -171,7 +171,7 @@ impl Connector {
/// To disable timeout set value to 0. /// To disable timeout set value to 0.
/// ///
/// By default disconnect timeout is set to 3 seconds. /// By default disconnect timeout is set to 3 seconds.
pub fn disconnect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self { pub fn disconnect_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
self.disconnect_timeout = timeout.into(); self.disconnect_timeout = timeout.into();
self self
} }
@ -256,7 +256,7 @@ impl Connector {
fn connector( fn connector(
connector: BoxedConnector, connector: BoxedConnector,
timeout: Millis, timeout: Millis,
disconnect_timeout: Millis, disconnect_timeout: Seconds,
) -> impl Service<Connect, Response = IoBoxed, Error = ConnectError> + fmt::Debug { ) -> impl Service<Connect, Response = IoBoxed, Error = ConnectError> + fmt::Debug {
TimeoutService::new( TimeoutService::new(
timeout, timeout,

View file

@ -172,6 +172,9 @@ impl Stream for PlStream {
Err(RecvError::KeepAlive) => { Err(RecvError::KeepAlive) => {
Err(io::Error::new(io::ErrorKind::Other, "Keep-alive").into()) Err(io::Error::new(io::ErrorKind::Other, "Keep-alive").into())
} }
Err(RecvError::Timeout) => {
Err(io::Error::new(io::ErrorKind::TimedOut, "Read timeout").into())
}
Err(RecvError::Stop) => { Err(RecvError::Stop) => {
Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped") Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped")
.into()) .into())

View file

@ -7,7 +7,7 @@ use ntex_h2::{self as h2};
use crate::http::uri::{Authority, Scheme, Uri}; use crate::http::uri::{Authority, Scheme, Uri};
use crate::io::{types::HttpProtocol, IoBoxed}; use crate::io::{types::HttpProtocol, IoBoxed};
use crate::service::{Pipeline, PipelineCall, Service, ServiceCtx}; use crate::service::{Pipeline, PipelineCall, Service, ServiceCtx};
use crate::time::{now, Millis}; use crate::time::{now, Seconds};
use crate::util::{ready, BoxFuture, ByteString, HashMap, HashSet}; use crate::util::{ready, BoxFuture, ByteString, HashMap, HashSet};
use crate::{channel::pool, rt::spawn, task::LocalWaker}; use crate::{channel::pool, rt::spawn, task::LocalWaker};
@ -57,7 +57,7 @@ where
connector: T, connector: T,
conn_lifetime: Duration, conn_lifetime: Duration,
conn_keep_alive: Duration, conn_keep_alive: Duration,
disconnect_timeout: Millis, disconnect_timeout: Seconds,
limit: usize, limit: usize,
h2config: h2::Config, h2config: h2::Config,
) -> Self { ) -> Self {
@ -178,7 +178,7 @@ where
pub(super) struct Inner { pub(super) struct Inner {
conn_lifetime: Duration, conn_lifetime: Duration,
conn_keep_alive: Duration, conn_keep_alive: Duration,
disconnect_timeout: Millis, disconnect_timeout: Seconds,
limit: usize, limit: usize,
h2config: h2::Config, h2config: h2::Config,
acquired: usize, acquired: usize,
@ -396,7 +396,7 @@ pin_project_lite::pin_project! {
uri: Uri, uri: Uri,
tx: Option<Waiter>, tx: Option<Waiter>,
guard: Option<OpenGuard>, guard: Option<OpenGuard>,
disconnect_timeout: Millis, disconnect_timeout: Seconds,
inner: Rc<RefCell<Inner>>, inner: Rc<RefCell<Inner>>,
} }
} }
@ -612,9 +612,8 @@ mod tests {
use std::{cell::RefCell, rc::Rc}; use std::{cell::RefCell, rc::Rc};
use super::*; use super::*;
use crate::{ use crate::time::{sleep, Millis};
http::Uri, io as nio, service::fn_service, testing::Io, time::sleep, util::lazy, use crate::{http::Uri, io as nio, service::fn_service, testing::Io, util::lazy};
};
#[crate::rt_test] #[crate::rt_test]
async fn test_basics() { async fn test_basics() {
@ -630,7 +629,7 @@ mod tests {
}), }),
Duration::from_secs(10), Duration::from_secs(10),
Duration::from_secs(10), Duration::from_secs(10),
Millis::ZERO, Seconds::ZERO,
1, 1,
h2::Config::client(), h2::Config::client(),
) )

View file

@ -110,7 +110,7 @@ where
/// Construct new `Dispatcher` instance with outgoing messages stream. /// Construct new `Dispatcher` instance with outgoing messages stream.
pub(in crate::http) fn new(io: Io<F>, config: Rc<DispatcherConfig<S, X, U>>) -> Self { pub(in crate::http) fn new(io: Io<F>, config: Rc<DispatcherConfig<S, X, U>>) -> Self {
let codec = Codec::new(config.timer.clone(), config.keep_alive_enabled()); let codec = Codec::new(config.timer.clone(), config.keep_alive_enabled());
io.set_disconnect_timeout(config.client_disconnect.into()); io.set_disconnect_timeout(config.client_disconnect);
// slow-request timer // slow-request timer
let flags = if config.client_timeout.is_zero() { let flags = if config.client_timeout.is_zero() {
@ -601,7 +601,7 @@ where
log::trace!("dispatcher is instructed to stop"); log::trace!("dispatcher is instructed to stop");
Poll::Ready(State::Stop) Poll::Ready(State::Stop)
} }
Err(RecvError::KeepAlive) => { Err(RecvError::KeepAlive) | Err(RecvError::Timeout) => {
// keep-alive timeout // keep-alive timeout
if !self.flags.contains(Flags::STARTED) { if !self.flags.contains(Flags::STARTED) {
log::trace!("slow request timeout"); log::trace!("slow request timeout");
@ -706,6 +706,7 @@ where
Poll::Pending => false, Poll::Pending => false,
Poll::Ready( Poll::Ready(
IoStatusUpdate::KeepAlive IoStatusUpdate::KeepAlive
| IoStatusUpdate::Timeout
| IoStatusUpdate::Stop | IoStatusUpdate::Stop
| IoStatusUpdate::PeerGone(_), | IoStatusUpdate::PeerGone(_),
) => true, ) => true,
@ -756,6 +757,12 @@ fn _poll_request_payload<F>(
*slf_payload = None; *slf_payload = None;
io::Error::new(io::ErrorKind::Other, "Keep-alive").into() io::Error::new(io::ErrorKind::Other, "Keep-alive").into()
} }
RecvError::Timeout => {
payload.1.set_error(PayloadError::EncodingCorrupted);
*slf_payload = None;
io::Error::new(io::ErrorKind::TimedOut, "Read timeout")
.into()
}
RecvError::Stop => { RecvError::Stop => {
payload.1.set_error(PayloadError::EncodingCorrupted); payload.1.set_error(PayloadError::EncodingCorrupted);
*slf_payload = None; *slf_payload = None;

View file

@ -201,7 +201,7 @@ where
X: 'static, X: 'static,
U: 'static, U: 'static,
{ {
io.set_disconnect_timeout(config.client_disconnect.into()); io.set_disconnect_timeout(config.client_disconnect);
let ioref = io.get_ref(); let ioref = io.get_ref();
let _ = server::handle_one( let _ = server::handle_one(

View file

@ -711,7 +711,7 @@ where
.lifetime(Seconds::ZERO) .lifetime(Seconds::ZERO)
.keep_alive(Seconds(30)) .keep_alive(Seconds(30))
.timeout(Millis(30_000)) .timeout(Millis(30_000))
.disconnect_timeout(Millis(5_000)) .disconnect_timeout(Seconds(5))
.openssl(builder.build()) .openssl(builder.build())
.finish() .finish()
} }

View file

@ -48,6 +48,9 @@ where
DispatchItem::KeepAliveTimeout => { DispatchItem::KeepAliveTimeout => {
Either::Right(Ready::Err(WsError::KeepAlive)) Either::Right(Ready::Err(WsError::KeepAlive))
} }
DispatchItem::ReadTimeout => {
Either::Right(Ready::Err(WsError::ReadTimeout))
}
DispatchItem::DecoderError(e) | DispatchItem::EncoderError(e) => { DispatchItem::DecoderError(e) | DispatchItem::EncoderError(e) => {
Either::Right(Ready::Err(WsError::Protocol(e))) Either::Right(Ready::Err(WsError::Protocol(e)))
} }
@ -97,11 +100,12 @@ where
// create ws service // create ws service
let srv = factory.into_factory().create(sink.clone()).await?; let srv = factory.into_factory().create(sink.clone()).await?;
let cfg = crate::io::DispatcherConfig::default();
cfg.set_keepalive_timeout(Seconds::ZERO);
// start websockets service dispatcher // start websockets service dispatcher
rt::spawn(async move { rt::spawn(async move {
let res = crate::io::Dispatcher::new(io, codec, srv) let res = crate::io::Dispatcher::with_config(io, codec, srv, &cfg).await;
.keepalive_timeout(Seconds::ZERO)
.await;
log::trace!("Ws handler is terminated: {:?}", res); log::trace!("Ws handler is terminated: {:?}", res);
}); });

View file

@ -15,7 +15,9 @@ use crate::connect::{Connect, ConnectError, Connector};
use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue, AUTHORIZATION}; use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue, AUTHORIZATION};
use crate::http::{body::BodySize, client::ClientResponse, error::HttpError, h1}; use crate::http::{body::BodySize, client::ClientResponse, error::HttpError, h1};
use crate::http::{ConnectionType, RequestHead, RequestHeadType, StatusCode, Uri}; use crate::http::{ConnectionType, RequestHead, RequestHeadType, StatusCode, Uri};
use crate::io::{Base, DispatchItem, Dispatcher, Filter, Io, Layer, Sealed}; use crate::io::{
Base, DispatchItem, Dispatcher, DispatcherConfig, Filter, Io, Layer, Sealed,
};
use crate::service::{apply_fn, into_service, IntoService, Pipeline, Service}; use crate::service::{apply_fn, into_service, IntoService, Pipeline, Service};
use crate::time::{timeout, Millis, Seconds}; use crate::time::{timeout, Millis, Seconds};
use crate::{channel::mpsc, rt, util::Ready, ws}; use crate::{channel::mpsc, rt, util::Ready, ws};
@ -31,8 +33,8 @@ pub struct WsClient<F, T> {
max_size: usize, max_size: usize,
server_mode: bool, server_mode: bool,
timeout: Millis, timeout: Millis,
keepalive_timeout: Seconds,
extra_headers: RefCell<Option<HeaderMap>>, extra_headers: RefCell<Option<HeaderMap>>,
config: DispatcherConfig,
_t: marker::PhantomData<F>, _t: marker::PhantomData<F>,
} }
@ -53,7 +55,7 @@ struct Inner<F, T> {
max_size: usize, max_size: usize,
server_mode: bool, server_mode: bool,
timeout: Millis, timeout: Millis,
keepalive_timeout: Seconds, config: DispatcherConfig,
_t: marker::PhantomData<F>, _t: marker::PhantomData<F>,
} }
@ -136,7 +138,6 @@ where
let max_size = self.max_size; let max_size = self.max_size;
let server_mode = self.server_mode; let server_mode = self.server_mode;
let to = self.timeout; let to = self.timeout;
let keepalive_timeout = self.keepalive_timeout;
let mut headers = self.extra_headers.borrow_mut().take().unwrap_or_default(); let mut headers = self.extra_headers.borrow_mut().take().unwrap_or_default();
// Generate a random key for the `Sec-WebSocket-Key` header. // Generate a random key for the `Sec-WebSocket-Key` header.
@ -248,7 +249,7 @@ where
} else { } else {
ws::Codec::new().max_size(max_size).client_mode() ws::Codec::new().max_size(max_size).client_mode()
}, },
keepalive_timeout, self.config.clone(),
)) ))
} }
} }
@ -282,18 +283,22 @@ impl WsClientBuilder<Base, ()> {
Err(e) => (Default::default(), Some(e.into())), Err(e) => (Default::default(), Some(e.into())),
}; };
let config = DispatcherConfig::default()
.set_keepalive_timeout(Seconds(600))
.clone();
WsClientBuilder { WsClientBuilder {
err, err,
origin: None, origin: None,
protocols: None, protocols: None,
inner: Some(Inner { inner: Some(Inner {
head, head,
config,
connector: Connector::<Uri>::default(), connector: Connector::<Uri>::default(),
addr: None, addr: None,
max_size: 65_536, max_size: 65_536,
server_mode: false, server_mode: false,
timeout: Millis(5_000), timeout: Millis(5_000),
keepalive_timeout: Seconds(600),
_t: marker::PhantomData, _t: marker::PhantomData,
}), }),
#[cfg(feature = "cookie")] #[cfg(feature = "cookie")]
@ -486,7 +491,7 @@ where
/// By default keep-alive timeout is set to 600 seconds. /// By default keep-alive timeout is set to 600 seconds.
pub fn keepalive_timeout(&mut self, timeout: Seconds) -> &mut Self { pub fn keepalive_timeout(&mut self, timeout: Seconds) -> &mut Self {
if let Some(parts) = parts(&mut self.inner, &self.err) { if let Some(parts) = parts(&mut self.inner, &self.err) {
parts.keepalive_timeout = timeout; parts.config.set_keepalive_timeout(timeout);
} }
self self
} }
@ -507,7 +512,7 @@ where
max_size: inner.max_size, max_size: inner.max_size,
server_mode: inner.server_mode, server_mode: inner.server_mode,
timeout: inner.timeout, timeout: inner.timeout,
keepalive_timeout: inner.keepalive_timeout, config: inner.config,
_t: marker::PhantomData, _t: marker::PhantomData,
}), }),
err: self.err.take(), err: self.err.take(),
@ -632,7 +637,7 @@ where
max_size: inner.max_size, max_size: inner.max_size,
server_mode: inner.server_mode, server_mode: inner.server_mode,
timeout: inner.timeout, timeout: inner.timeout,
keepalive_timeout: inner.keepalive_timeout, config: inner.config,
extra_headers: RefCell::new(None), extra_headers: RefCell::new(None),
_t: marker::PhantomData, _t: marker::PhantomData,
}) })
@ -673,7 +678,7 @@ pub struct WsConnection<F> {
io: Io<F>, io: Io<F>,
codec: ws::Codec, codec: ws::Codec,
res: ClientResponse, res: ClientResponse,
keepalive_timeout: Seconds, config: DispatcherConfig,
} }
impl<F> WsConnection<F> { impl<F> WsConnection<F> {
@ -681,13 +686,13 @@ impl<F> WsConnection<F> {
io: Io<F>, io: Io<F>,
res: ClientResponse, res: ClientResponse,
codec: ws::Codec, codec: ws::Codec,
keepalive_timeout: Seconds, config: DispatcherConfig,
) -> Self { ) -> Self {
Self { Self {
io, io,
codec, codec,
res, res,
keepalive_timeout, config,
} }
} }
@ -757,6 +762,7 @@ impl WsConnection<Sealed> {
DispatchItem::WBackPressureEnabled DispatchItem::WBackPressureEnabled
| DispatchItem::WBackPressureDisabled => Ok(None), | DispatchItem::WBackPressureDisabled => Ok(None),
DispatchItem::KeepAliveTimeout => Err(WsError::KeepAlive), DispatchItem::KeepAliveTimeout => Err(WsError::KeepAlive),
DispatchItem::ReadTimeout => Err(WsError::ReadTimeout),
DispatchItem::DecoderError(e) | DispatchItem::EncoderError(e) => { DispatchItem::DecoderError(e) | DispatchItem::EncoderError(e) => {
Err(WsError::Protocol(e)) Err(WsError::Protocol(e))
} }
@ -765,9 +771,7 @@ impl WsConnection<Sealed> {
}, },
); );
Dispatcher::new(self.io, self.codec, service) Dispatcher::with_config(self.io, self.codec, service, &self.config).await
.keepalive_timeout(self.keepalive_timeout)
.await
} }
} }
@ -778,7 +782,7 @@ impl<F: Filter> WsConnection<F> {
io: self.io.seal(), io: self.io.seal(),
codec: self.codec, codec: self.codec,
res: self.res, res: self.res,
keepalive_timeout: self.keepalive_timeout, config: self.config,
} }
} }

View file

@ -17,6 +17,9 @@ pub enum WsError<E> {
/// Keep-alive error /// Keep-alive error
#[error("Keep-alive error")] #[error("Keep-alive error")]
KeepAlive, KeepAlive,
/// Frame read timeout
#[error("Frame read timeout")]
ReadTimeout,
/// Ws protocol level error /// Ws protocol level error
#[error("Ws protocol level error")] #[error("Ws protocol level error")]
Protocol(ProtocolError), Protocol(ProtocolError),