Add Io::poll_status_update() method to use instead of register_dispatcher()

This commit is contained in:
Nikolay Kim 2021-12-30 16:42:43 +06:00
parent f91c50854e
commit ca72be32e9
25 changed files with 236 additions and 196 deletions

View file

@ -1,5 +1,11 @@
# Changes # Changes
## [0.1.0] - 2021-12-30
* Add Io::poll_status_update() method to use instead of register_dispatcher()
* Reset DSP_STOP and DSP_KEEPALIVE flags
## [0.1.0-b.10] - 2021-12-30 ## [0.1.0-b.10] - 2021-12-30
* IoRef::close() method initiates io stream shutdown * IoRef::close() method initiates io stream shutdown

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-io" name = "ntex-io"
version = "0.1.0-b.10" version = "0.1.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -31,7 +31,7 @@ async-std = ["async_std/unstable"]
ntex-codec = "0.6.0" ntex-codec = "0.6.0"
ntex-bytes = "0.1.8" ntex-bytes = "0.1.8"
ntex-util = "0.1.5" ntex-util = "0.1.5"
ntex-service = "0.3.0-b.0" ntex-service = "0.3.0"
bitflags = "1.3" bitflags = "1.3"
fxhash = "0.2.1" fxhash = "0.2.1"
@ -42,6 +42,6 @@ tok-io = { version = "1", package = "tokio", default-features = false, optional
async_std = { version = "1", package = "async-std", optional = true } async_std = { version = "1", package = "async-std", optional = true }
[dev-dependencies] [dev-dependencies]
ntex = "0.5.0-b.5" ntex = "0.5.0"
rand = "0.8" rand = "0.8"
env_logger = "0.9" env_logger = "0.9"

View file

@ -7,12 +7,12 @@ use ntex_service::{IntoService, Service};
use ntex_util::time::{now, Seconds}; use ntex_util::time::{now, Seconds};
use ntex_util::{future::Either, ready}; use ntex_util::{future::Either, ready};
use crate::{rt::spawn, DispatchItem, IoBoxed, IoRef, RecvError, Timer}; use crate::{rt::spawn, DispatchItem, IoBoxed, IoRef, IoStatusUpdate, RecvError, Timer};
type Response<U> = <U as Encoder>::Item; type Response<U> = <U as Encoder>::Item;
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
/// Framed dispatcher - is a future that reads frames from bytes stream /// Dispatcher - is a future that reads frames from bytes stream
/// and pass then to the service. /// and pass then to the service.
pub struct Dispatcher<S, U> pub struct Dispatcher<S, U>
where where
@ -28,6 +28,13 @@ pin_project_lite::pin_project! {
} }
} }
bitflags::bitflags! {
struct Flags: u8 {
const READY_ERR = 0b0001;
const IO_ERR = 0b0010;
}
}
struct DispatcherInner<S, U> struct DispatcherInner<S, U>
where where
S: Service<DispatchItem<U>, Response = Option<Response<U>>>, S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
@ -39,7 +46,7 @@ where
ka_timeout: Cell<Seconds>, ka_timeout: Cell<Seconds>,
ka_updated: Cell<time::Instant>, ka_updated: Cell<time::Instant>,
error: Cell<Option<S::Error>>, error: Cell<Option<S::Error>>,
ready_err: Cell<bool>, flags: Cell<Flags>,
shared: Rc<DispatcherShared<S, U>>, shared: Rc<DispatcherShared<S, U>>,
pool: Pool, pool: Pool,
} }
@ -112,7 +119,7 @@ where
pool: io.memory_pool().pool(), pool: io.memory_pool().pool(),
ka_updated: Cell::new(updated), ka_updated: Cell::new(updated),
error: Cell::new(None), error: Cell::new(None),
ready_err: Cell::new(false), flags: Cell::new(Flags::empty()),
st: Cell::new(DispatcherState::Processing), st: Cell::new(DispatcherState::Processing),
shared: Rc::new(DispatcherShared { shared: Rc::new(DispatcherShared {
codec, codec,
@ -304,7 +311,7 @@ where
slf.unregister_keepalive(); slf.unregister_keepalive();
// service may relay on poll_ready for response results // service may relay on poll_ready for response results
if !this.inner.ready_err.get() { if !slf.flags.get().contains(Flags::READY_ERR) {
let _ = this.service.poll_ready(cx); let _ = this.service.poll_ready(cx);
} }
@ -313,8 +320,21 @@ where
slf.st.set(DispatcherState::Shutdown); slf.st.set(DispatcherState::Shutdown);
continue; continue;
} }
} else { } else if !slf.flags.get().contains(Flags::IO_ERR) {
slf.io.register_dispatcher(cx); match ready!(slf.io.poll_status_update(cx)) {
IoStatusUpdate::PeerGone(_)
| IoStatusUpdate::Stop
| IoStatusUpdate::KeepAlive => {
slf.insert_flags(Flags::IO_ERR);
continue;
}
IoStatusUpdate::WriteBackpressure => {
if ready!(slf.io.poll_flush(cx, true)).is_err() {
slf.insert_flags(Flags::IO_ERR);
}
continue;
}
}
} }
return Poll::Pending; return Poll::Pending;
} }
@ -410,7 +430,7 @@ where
log::trace!("service readiness check failed, stopping"); log::trace!("service readiness check failed, stopping");
self.st.set(DispatcherState::Stop); self.st.set(DispatcherState::Stop);
self.error.set(Some(err)); self.error.set(Some(err));
self.ready_err.set(true); self.insert_flags(Flags::READY_ERR);
Poll::Ready(PollService::ServiceError) Poll::Ready(PollService::ServiceError)
} }
} }
@ -424,6 +444,12 @@ where
self.ka_timeout.get().non_zero() self.ka_timeout.get().non_zero()
} }
fn insert_flags(&self, f: Flags) {
let mut flags = self.flags.get();
flags.insert(f);
self.flags.set(flags)
}
/// update keep-alive timer /// update keep-alive timer
fn update_keepalive(&self) { fn update_keepalive(&self) {
if self.ka_enabled() { if self.ka_enabled() {
@ -518,7 +544,7 @@ mod tests {
inner: DispatcherInner { inner: DispatcherInner {
ka_updated: Cell::new(ka_updated), ka_updated: Cell::new(ka_updated),
error: Cell::new(None), error: Cell::new(None),
ready_err: Cell::new(false), flags: Cell::new(super::Flags::empty()),
st: Cell::new(DispatcherState::Processing), st: Cell::new(DispatcherState::Processing),
pool: state.memory_pool().pool(), pool: state.memory_pool().pool(),
io: state.into(), io: state.into(),
@ -753,7 +779,6 @@ mod tests {
assert_eq!(client.remote_buffer(|buf| buf.len()), 0); assert_eq!(client.remote_buffer(|buf| buf.len()), 0);
// response message // response message
assert!(!state.io().is_write_ready());
assert_eq!(state.io().with_write_buf(|buf| buf.len()).unwrap(), 65536); assert_eq!(state.io().with_write_buf(|buf| buf.len()).unwrap(), 65536);
client.remote_buffer_cap(10240); client.remote_buffer_cap(10240);
@ -765,7 +790,6 @@ mod tests {
assert_eq!(state.io().with_write_buf(|buf| buf.len()).unwrap(), 10240); assert_eq!(state.io().with_write_buf(|buf| buf.len()).unwrap(), 10240);
// backpressure disabled // backpressure disabled
assert!(state.io().is_write_ready());
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1, 2]); assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1, 2]);
} }
@ -814,7 +838,6 @@ mod tests {
// write side must be closed, dispatcher should fail with keep-alive // write side must be closed, dispatcher should fail with keep-alive
let flags = state.flags(); let flags = state.flags();
assert!(flags.contains(Flags::IO_STOPPING)); assert!(flags.contains(Flags::IO_STOPPING));
assert!(flags.contains(Flags::DSP_KEEPALIVE));
assert!(client.is_closed()); assert!(client.is_closed());
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]); assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]);
} }

View file

@ -5,6 +5,7 @@ use ntex_bytes::BytesMut;
use super::io::Flags; use super::io::Flags;
use super::{Filter, IoRef, ReadStatus, WriteStatus}; use super::{Filter, IoRef, ReadStatus, WriteStatus};
/// Default `Io` filter
pub struct Base(IoRef); pub struct Base(IoRef);
impl Base { impl Base {

View file

@ -7,9 +7,9 @@ use ntex_codec::{Decoder, Encoder};
use ntex_util::{future::poll_fn, future::Either, task::LocalWaker, time::Millis}; use ntex_util::{future::poll_fn, future::Either, task::LocalWaker, time::Millis};
use super::filter::{Base, NullFilter}; use super::filter::{Base, NullFilter};
use super::seal::{IoBoxed, Sealed}; use super::seal::Sealed;
use super::tasks::{ReadContext, WriteContext}; use super::tasks::{ReadContext, WriteContext};
use super::{Filter, FilterFactory, Handle, IoStream, RecvError}; use super::{Filter, FilterFactory, Handle, IoStatusUpdate, IoStream, RecvError};
bitflags::bitflags! { bitflags::bitflags! {
pub struct Flags: u16 { pub struct Flags: u16 {
@ -38,8 +38,6 @@ bitflags::bitflags! {
const DSP_STOP = 0b0000_0010_0000_0000; const DSP_STOP = 0b0000_0010_0000_0000;
/// keep-alive timeout occured /// keep-alive timeout occured
const DSP_KEEPALIVE = 0b0000_0100_0000_0000; const DSP_KEEPALIVE = 0b0000_0100_0000_0000;
/// dispatcher returned error
const DSP_ERR = 0b0000_1000_0000_0000;
} }
} }
@ -48,6 +46,7 @@ enum FilterItem<F> {
Ptr(*mut F), Ptr(*mut F),
} }
/// Interface object to underlying io stream
pub struct Io<F = Base>(pub(super) IoRef, FilterItem<F>); pub struct Io<F = Base>(pub(super) IoRef, FilterItem<F>);
#[derive(Clone)] #[derive(Clone)]
@ -233,13 +232,13 @@ impl Drop for IoState {
impl Io { impl Io {
#[inline] #[inline]
/// Create `State` instance /// Create `Io` instance
pub fn new<I: IoStream>(io: I) -> Self { pub fn new<I: IoStream>(io: I) -> Self {
Self::with_memory_pool(io, PoolId::DEFAULT.pool_ref()) Self::with_memory_pool(io, PoolId::DEFAULT.pool_ref())
} }
#[inline] #[inline]
/// Create `State` instance with specific memory pool. /// Create `Io` instance in specific memory pool.
pub fn with_memory_pool<I: IoStream>(io: I, pool: PoolRef) -> Self { pub fn with_memory_pool<I: IoStream>(io: I, pool: PoolRef) -> Self {
let inner = Rc::new(IoState { let inner = Rc::new(IoState {
pool: Cell::new(pool), pool: Cell::new(pool),
@ -289,7 +288,7 @@ impl<F> Io<F> {
} }
#[inline] #[inline]
/// Set io disconnect timeout in secs /// Set io disconnect timeout in millis
pub fn set_disconnect_timeout(&self, timeout: Millis) { pub fn set_disconnect_timeout(&self, timeout: Millis) {
self.0 .0.disconnect_timeout.set(timeout); self.0 .0.disconnect_timeout.set(timeout);
} }
@ -305,50 +304,26 @@ impl<F> Io<F> {
#[inline] #[inline]
#[allow(clippy::should_implement_trait)] #[allow(clippy::should_implement_trait)]
/// Get IoRef reference /// Get `IoRef` reference
pub fn as_ref(&self) -> &IoRef { pub fn as_ref(&self) -> &IoRef {
&self.0 &self.0
} }
#[inline] #[inline]
/// Get instance of IoRef /// Get instance of `IoRef`
pub fn get_ref(&self) -> IoRef { pub fn get_ref(&self) -> IoRef {
self.0.clone() self.0.clone()
} }
#[inline]
/// Check if dispatcher marked stopped
pub fn is_dispatcher_stopped(&self) -> bool {
self.flags().contains(Flags::DSP_STOP)
}
#[inline]
/// Register dispatcher task
pub fn register_dispatcher(&self, cx: &mut Context<'_>) {
self.0 .0.dispatch_task.register(cx.waker());
}
#[inline]
/// Reset keep-alive error
pub fn reset_keepalive(&self) {
self.0 .0.remove_flags(Flags::DSP_KEEPALIVE)
}
/// Get current io error /// Get current io error
fn error(&self) -> Option<io::Error> { fn error(&self) -> Option<io::Error> {
self.0 .0.error.take() self.0 .0.error.take()
} }
} }
impl Io<Sealed> {
pub fn boxed(self) -> IoBoxed {
self.into()
}
}
impl<F: Filter> Io<F> { impl<F: Filter> Io<F> {
#[inline] #[inline]
/// Get referece to filter /// Get referece to a filter
pub fn filter(&self) -> &F { pub fn filter(&self) -> &F {
if let FilterItem::Ptr(p) = self.1 { if let FilterItem::Ptr(p) = self.1 {
if let Some(r) = unsafe { p.as_ref() } { if let Some(r) = unsafe { p.as_ref() } {
@ -381,6 +356,7 @@ impl<F: Filter> Io<F> {
} }
#[inline] #[inline]
/// Create new filter and replace current one
pub fn add_filter<T>(self, factory: T) -> T::Future pub fn add_filter<T>(self, factory: T) -> T::Future
where where
T: FilterFactory<F>, T: FilterFactory<F>,
@ -389,6 +365,7 @@ impl<F: Filter> Io<F> {
} }
#[inline] #[inline]
/// Map current filter with new one
pub fn map_filter<T, U, E>(mut self, map: U) -> Result<Io<T>, E> pub fn map_filter<T, U, E>(mut self, map: U) -> Result<Io<T>, E>
where where
T: Filter, T: Filter,
@ -459,6 +436,7 @@ impl<F> Io<F> {
#[inline] #[inline]
/// Pause read task /// Pause read task
pub fn pause(&self) { pub fn pause(&self) {
self.0 .0.read_task.wake();
self.0 .0.insert_flags(Flags::RD_PAUSED); self.0 .0.insert_flags(Flags::RD_PAUSED);
} }
@ -490,7 +468,7 @@ impl<F> Io<F> {
} }
#[inline] #[inline]
/// Shut down io stream /// Gracefully shutdown io stream
pub async fn shutdown(&self) -> io::Result<()> { pub async fn shutdown(&self) -> io::Result<()> {
poll_fn(|cx| self.poll_shutdown(cx)).await poll_fn(|cx| self.poll_shutdown(cx)).await
} }
@ -563,8 +541,10 @@ impl<F> Io<F> {
if flags.contains(Flags::IO_STOPPED) { if flags.contains(Flags::IO_STOPPED) {
Poll::Ready(Err(RecvError::PeerGone(self.error()))) Poll::Ready(Err(RecvError::PeerGone(self.error())))
} else if flags.contains(Flags::DSP_STOP) { } else if flags.contains(Flags::DSP_STOP) {
self.0 .0.remove_flags(Flags::DSP_STOP);
Poll::Ready(Err(RecvError::Stop)) Poll::Ready(Err(RecvError::Stop))
} else if flags.contains(Flags::DSP_KEEPALIVE) { } else if flags.contains(Flags::DSP_KEEPALIVE) {
self.0 .0.remove_flags(Flags::DSP_KEEPALIVE);
Poll::Ready(Err(RecvError::KeepAlive)) Poll::Ready(Err(RecvError::KeepAlive))
} else if flags.contains(Flags::WR_BACKPRESSURE) { } else if flags.contains(Flags::WR_BACKPRESSURE) {
Poll::Ready(Err(RecvError::WriteBackpressure)) Poll::Ready(Err(RecvError::WriteBackpressure))
@ -623,7 +603,7 @@ impl<F> Io<F> {
} }
#[inline] #[inline]
/// Shut down io stream /// Gracefully shutdown io stream
pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let flags = self.flags(); let flags = self.flags();
@ -641,6 +621,26 @@ impl<F> Io<F> {
Poll::Pending Poll::Pending
} }
} }
#[inline]
/// Wait for status updates
pub fn poll_status_update(&self, cx: &mut Context<'_>) -> Poll<IoStatusUpdate> {
let flags = self.flags();
if flags.contains(Flags::IO_STOPPED) {
Poll::Ready(IoStatusUpdate::PeerGone(self.error()))
} else if flags.contains(Flags::DSP_STOP) {
self.0 .0.remove_flags(Flags::DSP_STOP);
Poll::Ready(IoStatusUpdate::Stop)
} else if flags.contains(Flags::DSP_KEEPALIVE) {
self.0 .0.remove_flags(Flags::DSP_KEEPALIVE);
Poll::Ready(IoStatusUpdate::KeepAlive)
} else if flags.contains(Flags::WR_BACKPRESSURE) {
Poll::Ready(IoStatusUpdate::WriteBackpressure)
} else {
self.0 .0.dispatch_task.register(cx.waker());
Poll::Pending
}
}
} }
impl<F> Drop for Io<F> { impl<F> Drop for Io<F> {

View file

@ -38,36 +38,6 @@ impl IoRef {
self.0.flags.get().contains(Flags::IO_STOPPING) self.0.flags.get().contains(Flags::IO_STOPPING)
} }
#[inline]
/// Check if write task is ready
pub fn is_write_ready(&self) -> bool {
!self.0.flags.get().contains(Flags::WR_BACKPRESSURE)
}
#[inline]
/// Check if read buffer has new data
pub fn is_read_ready(&self) -> bool {
self.0.flags.get().contains(Flags::RD_READY)
}
#[inline]
/// Check if write buffer is full
pub fn is_write_buf_full(&self) -> bool {
let len = self
.0
.with_write_buf(|buf| buf.as_ref().map(|b| b.len()).unwrap_or(0));
len >= self.memory_pool().write_params_high()
}
#[inline]
/// Check if read buffer is full
pub fn is_read_buf_full(&self) -> bool {
let len = self
.0
.with_read_buf(false, |buf| buf.as_ref().map(|b| b.len()).unwrap_or(0));
len >= self.memory_pool().read_params_high()
}
#[inline] #[inline]
/// Wake dispatcher task /// Wake dispatcher task
pub fn wake(&self) { pub fn wake(&self) {
@ -77,7 +47,7 @@ impl IoRef {
#[inline] #[inline]
/// Gracefully close connection /// Gracefully close connection
/// ///
/// Notify dispatcher and initiate io stream shutdown process /// Notify dispatcher and initiate io stream shutdown process.
pub fn close(&self) { pub fn close(&self) {
self.0.insert_flags(Flags::DSP_STOP); self.0.insert_flags(Flags::DSP_STOP);
self.0.init_shutdown(None); self.0.init_shutdown(None);
@ -108,13 +78,7 @@ impl IoRef {
} }
#[inline] #[inline]
/// Notify when io stream get disconnected /// Query filter specific data
pub fn on_disconnect(&self) -> OnDisconnect {
OnDisconnect::new(self.0.clone())
}
#[inline]
/// Query specific data
pub fn query<T: 'static>(&self) -> types::QueryItem<T> { pub fn query<T: 'static>(&self) -> types::QueryItem<T> {
if let Some(item) = self.filter().query(any::TypeId::of::<T>()) { if let Some(item) = self.filter().query(any::TypeId::of::<T>()) {
types::QueryItem::new(item) types::QueryItem::new(item)
@ -123,6 +87,67 @@ impl IoRef {
} }
} }
#[inline]
/// Encode and write item to a buffer and wake up write task
pub fn encode<U>(&self, item: U::Item, codec: &U) -> Result<(), <U as Encoder>::Error>
where
U: Encoder,
{
let flags = self.0.flags.get();
if !flags.contains(Flags::IO_STOPPING) {
self.with_write_buf(|buf| {
let (hw, lw) = self.memory_pool().write_params().unpack();
// make sure we've got room
let remaining = buf.remaining_mut();
if remaining < lw {
buf.reserve(hw - remaining);
}
// encode item and wake write task
codec.encode(item, buf)
})
.map_or_else(
|err| {
self.0.io_stopped(Some(err));
Ok(())
},
|item| item,
)
} else {
Ok(())
}
}
#[inline]
/// Attempts to decode a frame from the read buffer
pub fn decode<U>(
&self,
codec: &U,
) -> Result<Option<<U as Decoder>::Item>, <U as Decoder>::Error>
where
U: Decoder,
{
self.0.with_read_buf(false, |buf| {
buf.as_mut().map(|b| codec.decode(b)).unwrap_or(Ok(None))
})
}
#[inline]
/// Write bytes to a buffer and wake up write task
pub fn write(&self, src: &[u8]) -> io::Result<()> {
let flags = self.0.flags.get();
if !flags.intersects(Flags::IO_STOPPING) {
self.with_write_buf(|buf| {
buf.extend_from_slice(src);
})
} else {
Ok(())
}
}
#[inline] #[inline]
/// Get mut access to write buffer /// Get mut access to write buffer
pub fn with_write_buf<F, R>(&self, f: F) -> Result<R, io::Error> pub fn with_write_buf<F, R>(&self, f: F) -> Result<R, io::Error>
@ -159,69 +184,9 @@ impl IoRef {
} }
#[inline] #[inline]
/// Encode and write item to a buffer and wake up write task /// Notify when io stream get disconnected
/// pub fn on_disconnect(&self) -> OnDisconnect {
/// Returns write buffer state, false is returned if write buffer if full. OnDisconnect::new(self.0.clone())
pub fn encode<U>(&self, item: U::Item, codec: &U) -> Result<(), <U as Encoder>::Error>
where
U: Encoder,
{
let flags = self.0.flags.get();
if !flags.contains(Flags::IO_STOPPING) {
self.with_write_buf(|buf| {
let (hw, lw) = self.memory_pool().write_params().unpack();
// make sure we've got room
let remaining = buf.remaining_mut();
if remaining < lw {
buf.reserve(hw - remaining);
}
// encode item and wake write task
codec.encode(item, buf)
})
.map_or_else(
|err| {
self.0.io_stopped(Some(err));
Ok(())
},
|item| item,
)
} else {
Ok(())
}
}
#[inline]
/// Attempts to decode a frame from the read buffer
///
/// Read buffer ready state gets cleanup if decoder cannot
/// decode any frame.
pub fn decode<U>(
&self,
codec: &U,
) -> Result<Option<<U as Decoder>::Item>, <U as Decoder>::Error>
where
U: Decoder,
{
self.0.with_read_buf(false, |buf| {
buf.as_mut().map(|b| codec.decode(b)).unwrap_or(Ok(None))
})
}
#[inline]
/// Write bytes to a buffer and wake up write task
pub fn write(&self, src: &[u8]) -> io::Result<()> {
let flags = self.0.flags.get();
if !flags.intersects(Flags::IO_STOPPING) {
self.with_write_buf(|buf| {
buf.extend_from_slice(src);
})
} else {
Ok(())
}
} }
} }
@ -257,9 +222,6 @@ mod tests {
client.write(TEXT); client.write(TEXT);
let state = Io::new(server); let state = Io::new(server);
assert!(!state.is_read_buf_full());
assert!(!state.is_write_buf_full());
let msg = state.recv(&BytesCodec).await.unwrap().unwrap(); let msg = state.recv(&BytesCodec).await.unwrap().unwrap();
assert_eq!(msg, Bytes::from_static(BIN)); assert_eq!(msg, Bytes::from_static(BIN));

View file

@ -16,7 +16,7 @@ mod ioref;
mod seal; mod seal;
mod tasks; mod tasks;
mod time; mod time;
mod utils; pub mod utils;
#[cfg(feature = "async-std")] #[cfg(feature = "async-std")]
mod asyncstd_rt; mod asyncstd_rt;
@ -31,24 +31,29 @@ use ntex_util::time::Millis;
pub use self::dispatcher::Dispatcher; pub use self::dispatcher::Dispatcher;
pub use self::filter::Base; pub use self::filter::Base;
pub use self::framed::Framed; pub use self::io::{Io, IoRef};
pub use self::io::{Io, IoRef, OnDisconnect};
pub use self::seal::{IoBoxed, Sealed}; pub use self::seal::{IoBoxed, Sealed};
pub use self::tasks::{ReadContext, WriteContext}; pub use self::tasks::{ReadContext, WriteContext};
pub use self::time::Timer; pub use self::time::Timer;
pub use self::utils::{add_filter, boxed, seal, Boxed, BoxedFactory}; pub use self::utils::filter;
/// Status for read task
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum ReadStatus { pub enum ReadStatus {
Ready, Ready,
Terminate, Terminate,
} }
/// Status for write task
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum WriteStatus { pub enum WriteStatus {
/// Write task is clear to proceed with write operation
Ready, Ready,
/// Initiate timeout for normal write operations, shutdown connection after timeout
Timeout(Millis), Timeout(Millis),
/// Initiate graceful io shutdown operation with timeout
Shutdown(Millis), Shutdown(Millis),
/// Immediately terminate connection
Terminate, Terminate,
} }
@ -69,19 +74,26 @@ pub trait Filter: 'static {
fn release_write_buf(&self, buf: BytesMut) -> sio::Result<()>; fn release_write_buf(&self, buf: BytesMut) -> sio::Result<()>;
/// Check readiness for read operations
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus>; fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus>;
/// Check readiness for write operations
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus>; fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus>;
/// Gracefully shutdown filter
fn poll_shutdown(&self) -> Poll<sio::Result<()>>; fn poll_shutdown(&self) -> Poll<sio::Result<()>>;
} }
/// Creates new `Filter` values.
pub trait FilterFactory<F: Filter>: Sized { pub trait FilterFactory<F: Filter>: Sized {
/// The `Filter` value created by this factory
type Filter: Filter; type Filter: Filter;
/// Errors produced while building a filter.
type Error: fmt::Debug; type Error: fmt::Debug;
/// The future of the `FilterFactory` instance.
type Future: Future<Output = Result<Io<Self::Filter>, Self::Error>>; type Future: Future<Output = Result<Io<Self::Filter>, Self::Error>>;
/// Create and return a new filter value asynchronously.
fn create(self, st: Io<F>) -> Self::Future; fn create(self, st: Io<F>) -> Self::Future;
} }
@ -93,6 +105,19 @@ pub trait Handle {
fn query(&self, id: TypeId) -> Option<Box<dyn Any>>; fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
} }
/// Io status
#[derive(Debug)]
pub enum IoStatusUpdate {
/// Keep-alive timeout occured
KeepAlive,
/// Write backpressure is enabled
WriteBackpressure,
/// Stop io stream handling
Stop,
/// Peer is disconnected
PeerGone(Option<sio::Error>),
}
/// Recv error /// Recv error
#[derive(Debug)] #[derive(Debug)]
pub enum RecvError<U: Decoder> { pub enum RecvError<U: Decoder> {

View file

@ -2,8 +2,10 @@ use std::ops;
use crate::{Filter, Io}; use crate::{Filter, Io};
/// Sealed filter type
pub struct Sealed(pub(crate) Box<dyn Filter>); pub struct Sealed(pub(crate) Box<dyn Filter>);
/// Boxed `Io` object with erased filter type
pub struct IoBoxed(Io<Sealed>); pub struct IoBoxed(Io<Sealed>);
impl From<Io<Sealed>> for IoBoxed { impl From<Io<Sealed>> for IoBoxed {

View file

@ -4,6 +4,7 @@ use ntex_bytes::{BytesMut, PoolRef};
use super::{io::Flags, IoRef, ReadStatus, WriteStatus}; use super::{io::Flags, IoRef, ReadStatus, WriteStatus};
/// Context for io read task
pub struct ReadContext(IoRef); pub struct ReadContext(IoRef);
impl ReadContext { impl ReadContext {
@ -12,16 +13,19 @@ impl ReadContext {
} }
#[inline] #[inline]
/// Return memory pool for this context
pub fn memory_pool(&self) -> PoolRef { pub fn memory_pool(&self) -> PoolRef {
self.0.memory_pool() self.0.memory_pool()
} }
#[inline] #[inline]
/// Check readiness for read operations
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> { pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
self.0.filter().poll_read_ready(cx) self.0.filter().poll_read_ready(cx)
} }
#[inline] #[inline]
/// Get read buffer
pub fn get_read_buf(&self) -> BytesMut { pub fn get_read_buf(&self) -> BytesMut {
self.0 self.0
.filter() .filter()
@ -30,6 +34,7 @@ impl ReadContext {
} }
#[inline] #[inline]
/// Release read buffer after io read operations
pub fn release_read_buf(&self, buf: BytesMut, nbytes: usize) { pub fn release_read_buf(&self, buf: BytesMut, nbytes: usize) {
if buf.is_empty() { if buf.is_empty() {
self.0.memory_pool().release_read_buf(buf); self.0.memory_pool().release_read_buf(buf);
@ -72,11 +77,13 @@ impl ReadContext {
} }
#[inline] #[inline]
/// Indicate that io task is stopped
pub fn close(&self, err: Option<io::Error>) { pub fn close(&self, err: Option<io::Error>) {
self.0 .0.io_stopped(err); self.0 .0.io_stopped(err);
} }
} }
/// Context for io write task
pub struct WriteContext(IoRef); pub struct WriteContext(IoRef);
impl WriteContext { impl WriteContext {
@ -85,21 +92,25 @@ impl WriteContext {
} }
#[inline] #[inline]
/// Return memory pool for this context
pub fn memory_pool(&self) -> PoolRef { pub fn memory_pool(&self) -> PoolRef {
self.0.memory_pool() self.0.memory_pool()
} }
#[inline] #[inline]
/// Check readiness for write operations
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> { pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
self.0.filter().poll_write_ready(cx) self.0.filter().poll_write_ready(cx)
} }
#[inline] #[inline]
/// Get write buffer
pub fn get_write_buf(&self) -> Option<BytesMut> { pub fn get_write_buf(&self) -> Option<BytesMut> {
self.0 .0.write_buf.take() self.0 .0.write_buf.take()
} }
#[inline] #[inline]
/// Release write buffer after io write operations
pub fn release_write_buf(&self, buf: BytesMut) -> Result<(), io::Error> { pub fn release_write_buf(&self, buf: BytesMut) -> Result<(), io::Error> {
let pool = self.0.memory_pool(); let pool = self.0.memory_pool();
let mut flags = self.0.flags(); let mut flags = self.0.flags();
@ -131,6 +142,7 @@ impl WriteContext {
} }
#[inline] #[inline]
/// Indicate that io task is stopped
pub fn close(&self, err: Option<io::Error>) { pub fn close(&self, err: Option<io::Error>) {
self.0 .0.io_stopped(err); self.0 .0.io_stopped(err);
} }

View file

@ -1,3 +1,4 @@
//! utilities and helpers for testing
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker}; use std::task::{Context, Poll, Waker};

View file

@ -3,7 +3,9 @@ use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Po
use ntex_service::{fn_factory_with_config, into_service, Service, ServiceFactory}; use ntex_service::{fn_factory_with_config, into_service, Service, ServiceFactory};
use ntex_util::{future::Ready, ready}; use ntex_util::{future::Ready, ready};
use super::{Filter, FilterFactory, Io, IoBoxed}; pub use crate::framed::Framed;
pub use crate::io::OnDisconnect;
use crate::{Filter, FilterFactory, Io, IoBoxed};
/// Service that converts any Io<F> stream to IoBoxed stream /// Service that converts any Io<F> stream to IoBoxed stream
pub fn seal<F, S, C>( pub fn seal<F, S, C>(
@ -41,7 +43,7 @@ where
} }
/// Create filter factory service /// Create filter factory service
pub fn add_filter<T, F>(filter: T) -> FilterServiceFactory<T, F> pub fn filter<T, F>(filter: T) -> FilterServiceFactory<T, F>
where where
T: FilterFactory<F> + Clone, T: FilterFactory<F> + Clone,
F: Filter, F: Filter,
@ -137,6 +139,7 @@ where
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct BoxedFactoryResponse<S: ServiceFactory<R, C>, R, C> { pub struct BoxedFactoryResponse<S: ServiceFactory<R, C>, R, C> {
#[pin] #[pin]
fut: S::Future, fut: S::Future,
@ -156,6 +159,7 @@ impl<S: ServiceFactory<R, C>, R, C> Future for BoxedFactoryResponse<S, R, C> {
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct BoxedResponse<S: Service<R>, R> { pub struct BoxedResponse<S: Service<R>, R> {
#[pin] #[pin]
fut: S::Future, fut: S::Future,

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-rt" name = "ntex-rt"
version = "0.4.0-b.3" version = "0.4.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime" description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -26,8 +26,8 @@ async-std = ["ntex-io/async-std", "async_std/unstable"]
[dependencies] [dependencies]
ntex-bytes = "0.1.8" ntex-bytes = "0.1.8"
ntex-io = "0.1.0-b.9" ntex-io = "0.1.0"
ntex-util = "0.1.3" ntex-util = "0.1.4"
async-oneshot = "0.5.0" async-oneshot = "0.5.0"
async-channel = "1.6.1" async-channel = "1.6.1"
derive_more = "0.99.14" derive_more = "0.99.14"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-service" name = "ntex-service"
version = "0.3.0-b.0" version = "0.3.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service" description = "ntex service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -16,8 +16,8 @@ name = "ntex_service"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
ntex-util = "0.1.4" ntex-util = "0.1.5"
pin-project-lite = "0.2.6" pin-project-lite = "0.2.6"
[dev-dependencies] [dev-dependencies]
ntex = "0.5.0-b.0" ntex = "0.5.0"

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.1.0] - 2021-12-30
* Upgrade to ntex-io 0.1
## [0.1.0-b.5] - 2021-12-28 ## [0.1.0-b.5] - 2021-12-28
* Proper handling for openssl ZERO_RETURN error * Proper handling for openssl ZERO_RETURN error

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-tls" name = "ntex-tls"
version = "0.1.0-b.7" version = "0.1.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "An implementation of SSL streams for ntex backed by OpenSSL" description = "An implementation of SSL streams for ntex backed by OpenSSL"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -28,7 +28,7 @@ rustls = ["tls_rust"]
ntex-bytes = "0.1.8" ntex-bytes = "0.1.8"
ntex-io = "0.1.0-b.10" ntex-io = "0.1.0-b.10"
ntex-util = "0.1.5" ntex-util = "0.1.5"
ntex-service = "0.3.0-b.0" ntex-service = "0.3.0"
pin-project-lite = "0.2" pin-project-lite = "0.2"
# openssl # openssl
@ -38,7 +38,7 @@ tls_openssl = { version="0.10", package = "openssl", optional = true }
tls_rust = { version = "0.20", package = "rustls", optional = true } tls_rust = { version = "0.20", package = "rustls", optional = true }
[dev-dependencies] [dev-dependencies]
ntex = { version = "0.5.0-b.0", features = ["openssl", "rustls"] } ntex = { version = "0.5.0", features = ["openssl", "rustls"] }
log = "0.4" log = "0.4"
env_logger = "0.9" env_logger = "0.9"
rustls-pemfile = { version = "0.2" } rustls-pemfile = { version = "0.2" }

View file

@ -1,11 +1,10 @@
use std::{fs::File, io, io::BufReader, sync::Arc}; use std::{fs::File, io, io::BufReader, sync::Arc};
use ntex::service::{fn_service, pipeline_factory}; use ntex::service::{fn_service, pipeline_factory};
use ntex::{codec, io::add_filter, io::Io, server, util::Either}; use ntex::{codec, io::filter, io::Io, server, util::Either};
use ntex_tls::rustls::TlsAcceptor; use ntex_tls::rustls::TlsAcceptor;
use rustls_pemfile::{certs, rsa_private_keys}; use rustls_pemfile::{certs, rsa_private_keys};
use tls_rust::{Certificate, PrivateKey, ServerConfig}; use tls_rust::{Certificate, PrivateKey, ServerConfig};
// use tls_openssl::ssl::{self, SslFiletype, SslMethod};
#[ntex::main] #[ntex::main]
async fn main() -> io::Result<()> { async fn main() -> io::Result<()> {
@ -35,7 +34,7 @@ async fn main() -> io::Result<()> {
// start server // start server
server::ServerBuilder::new() server::ServerBuilder::new()
.bind("basic", "127.0.0.1:8443", move |_| { .bind("basic", "127.0.0.1:8443", move |_| {
pipeline_factory(add_filter(TlsAcceptor::new(tls_config.clone()))).and_then( pipeline_factory(filter(TlsAcceptor::new(tls_config.clone()))).and_then(
fn_service(|io: Io<_>| async move { fn_service(|io: Io<_>| async move {
println!("New client is connected"); println!("New client is connected");

View file

@ -1,7 +1,7 @@
use std::io; use std::io;
use ntex::service::{fn_service, pipeline_factory}; use ntex::service::{fn_service, pipeline_factory};
use ntex::{codec, io::add_filter, io::Io, server, util::Either}; use ntex::{codec, io::utils::filter, io::Io, server, util::Either};
use ntex_tls::openssl::{PeerCert, PeerCertChain, SslAcceptor}; use ntex_tls::openssl::{PeerCert, PeerCertChain, SslAcceptor};
use tls_openssl::ssl::{self, SslFiletype, SslMethod, SslVerifyMode}; use tls_openssl::ssl::{self, SslFiletype, SslMethod, SslVerifyMode};
@ -27,7 +27,7 @@ async fn main() -> io::Result<()> {
// start server // start server
server::ServerBuilder::new() server::ServerBuilder::new()
.bind("basic", "127.0.0.1:8443", move |_| { .bind("basic", "127.0.0.1:8443", move |_| {
pipeline_factory(add_filter(SslAcceptor::new(acceptor.clone()))).and_then( pipeline_factory(filter(SslAcceptor::new(acceptor.clone()))).and_then(
fn_service(|io: Io<_>| async move { fn_service(|io: Io<_>| async move {
println!("New client is connected"); println!("New client is connected");
if let Some(cert) = io.query::<PeerCert>().as_ref() { if let Some(cert) = io.query::<PeerCert>().as_ref() {

View file

@ -1,7 +1,7 @@
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{error::Error, future::Future, marker::PhantomData, pin::Pin}; use std::{error::Error, future::Future, marker::PhantomData, pin::Pin};
use ntex_io::{BoxedFactory, Filter, FilterFactory, Io}; use ntex_io::{utils::BoxedFactory, Filter, FilterFactory, Io};
use ntex_service::{Service, ServiceFactory}; use ntex_service::{Service, ServiceFactory};
use ntex_util::{future::Ready, time::Millis}; use ntex_util::{future::Ready, time::Millis};
use tls_openssl::ssl::SslAcceptor; use tls_openssl::ssl::SslAcceptor;

View file

@ -1,8 +1,10 @@
# Changes # Changes
## [0.5.0-b.8] - 2021-12-xx ## [0.5.0] - 2021-12-30
* Update cookie 0.16 * Upgrade to ntex-io 0.1
* Updrade to cookie 0.16
## [0.5.0-b.7] - 2021-12-30 ## [0.5.0-b.7] - 2021-12-30

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "0.5.0-b.7" version = "0.5.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"
@ -47,13 +47,13 @@ async-std = ["ntex-rt/async-std"]
[dependencies] [dependencies]
ntex-codec = "0.6.0" ntex-codec = "0.6.0"
ntex-router = "0.5.1" ntex-router = "0.5.1"
ntex-service = "0.3.0-b.0" ntex-service = "0.3.0"
ntex-macros = "0.1.3" ntex-macros = "0.1.3"
ntex-util = "0.1.5" ntex-util = "0.1.5"
ntex-bytes = "0.1.8" ntex-bytes = "0.1.8"
ntex-tls = "0.1.0-b.7" ntex-tls = "0.1.0"
ntex-rt = "0.4.0-b.3" ntex-rt = "0.4.0"
ntex-io = { version = "0.1.0-b.10", features = ["tokio-traits"] } ntex-io = { version = "0.1.0", features = ["tokio-traits"] }
base64 = "0.13" base64 = "0.13"
bitflags = "1.3" bitflags = "1.3"

View file

@ -5,7 +5,7 @@ pub use tls_openssl::ssl::{Error as SslError, HandshakeError, SslConnector, SslM
use ntex_tls::openssl::SslConnector as IoSslConnector; use ntex_tls::openssl::SslConnector as IoSslConnector;
use crate::io::{Base, Boxed, Io}; use crate::io::{utils::Boxed, Base, Io};
use crate::service::{Service, ServiceFactory}; use crate::service::{Service, ServiceFactory};
use crate::util::{PoolId, Ready}; use crate::util::{PoolId, Ready};

View file

@ -5,7 +5,7 @@ pub use tls_rustls::{ClientConfig, ServerName};
use ntex_tls::rustls::TlsConnector; use ntex_tls::rustls::TlsConnector;
use crate::io::{Base, Boxed, Io}; use crate::io::{utils::Boxed, Base, Io};
use crate::service::{Service, ServiceFactory}; use crate::service::{Service, ServiceFactory};
use crate::util::{PoolId, Ready}; use crate::util::{PoolId, Ready};

View file

@ -1,7 +1,7 @@
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{collections::VecDeque, future::Future, io, net::SocketAddr, pin::Pin}; use std::{collections::VecDeque, future::Future, io, net::SocketAddr, pin::Pin};
use crate::io::{types, Boxed, Io}; use crate::io::{types, utils::Boxed, Io};
use crate::rt::tcp_connect_in; use crate::rt::tcp_connect_in;
use crate::service::{Service, ServiceFactory}; use crate::service::{Service, ServiceFactory};
use crate::util::{Either, PoolId, PoolRef, Ready}; use crate::util::{Either, PoolId, PoolRef, Ready};

View file

@ -450,7 +450,6 @@ where
.timer_h1 .timer_h1
.register(expire, self.expire, &self.state); .register(expire, self.expire, &self.state);
self.expire = expire; self.expire = expire;
self.io().reset_keepalive();
} }
} }
} }

View file

@ -1,6 +1,6 @@
use std::{future::Future, rc::Rc}; use std::{future::Future, rc::Rc};
use crate::io::{IoRef, OnDisconnect}; use crate::io::{utils::OnDisconnect, IoRef};
use crate::ws; use crate::ws;
pub struct WsSink(Rc<WsSinkInner>); pub struct WsSink(Rc<WsSinkInner>);