Migrate ntex-io to async fn in trait

This commit is contained in:
Nikolay Kim 2024-01-07 04:35:30 +06:00
parent 9119f997fd
commit 60620d4587
7 changed files with 44 additions and 106 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [1.0.0] - 2024-01-0x
* Use "async fn" in trait for Service definition
## [0.3.17] - 2023-12-25 ## [0.3.17] - 2023-12-25
* Fix filter leak during Io drop * Fix filter leak during Io drop

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-io" name = "ntex-io"
version = "0.3.17" version = "1.0.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -18,8 +18,8 @@ path = "src/lib.rs"
[dependencies] [dependencies]
ntex-codec = "0.6.2" ntex-codec = "0.6.2"
ntex-bytes = "0.1.21" ntex-bytes = "0.1.21"
ntex-util = "0.3.4" ntex-util = "1.0.0"
ntex-service = "1.2.7" ntex-service = "2.0.0"
bitflags = "2.4" bitflags = "2.4"
log = "0.4" log = "0.4"

View file

@ -1,5 +1,5 @@
//! Framed transport dispatcher //! Framed transport dispatcher
use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll, time}; use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll};
use ntex_bytes::Pool; use ntex_bytes::Pool;
use ntex_codec::{Decoder, Encoder}; use ntex_codec::{Decoder, Encoder};
@ -38,17 +38,9 @@ impl Default for DispatcherConfig {
} }
impl DispatcherConfig { impl DispatcherConfig {
#[doc(hidden)]
#[deprecated(since = "0.3.12")]
#[inline] #[inline]
/// Get keep-alive timeout /// Get keep-alive timeout
pub fn keepalive_timeout(&self) -> time::Duration { pub fn keepalive_timeout(&self) -> Seconds {
self.0.keepalive_timeout.get().into()
}
#[inline]
/// Get keep-alive timeout
pub fn keepalive_timeout_secs(&self) -> Seconds {
self.0.keepalive_timeout.get() self.0.keepalive_timeout.get()
} }
@ -58,25 +50,9 @@ impl DispatcherConfig {
self.0.disconnect_timeout.get() self.0.disconnect_timeout.get()
} }
#[doc(hidden)]
#[deprecated(since = "0.3.12")]
#[inline] #[inline]
/// Get frame read rate /// Get frame read rate
pub fn frame_read_rate(&self) -> Option<(time::Duration, time::Duration, u16)> { pub fn frame_read_rate(&self) -> Option<(Seconds, Seconds, u16)> {
if self.0.frame_read_enabled.get() {
Some((
self.0.frame_read_timeout.get().into(),
self.0.frame_read_max_timeout.get().into(),
self.0.frame_read_rate.get(),
))
} else {
None
}
}
#[inline]
/// Get frame read rate
pub fn frame_read_rate_params(&self) -> Option<(Seconds, Seconds, u16)> {
if self.0.frame_read_enabled.get() { if self.0.frame_read_enabled.get() {
Some(( Some((
self.0.frame_read_timeout.get(), self.0.frame_read_timeout.get(),
@ -219,7 +195,7 @@ where
U: Decoder + Encoder, U: Decoder + Encoder,
{ {
/// Construct new `Dispatcher` instance. /// Construct new `Dispatcher` instance.
pub fn with_config<Io, F>( pub fn new<Io, F>(
io: Io, io: Io,
codec: U, codec: U,
service: F, service: F,
@ -232,7 +208,7 @@ where
let io = IoBoxed::from(io); let io = IoBoxed::from(io);
io.set_disconnect_timeout(cfg.disconnect_timeout()); io.set_disconnect_timeout(cfg.disconnect_timeout());
let flags = if cfg.keepalive_timeout_secs().is_zero() { let flags = if cfg.keepalive_timeout().is_zero() {
Flags::empty() Flags::empty()
} else { } else {
Flags::KA_ENABLED Flags::KA_ENABLED
@ -261,17 +237,6 @@ where
}, },
} }
} }
#[doc(hidden)]
#[deprecated(since = "0.3.6", note = "Use Dispatcher::with_config() method")]
/// Construct new `Dispatcher` instance.
pub fn new<Io, F>(io: Io, codec: U, service: F) -> Dispatcher<S, U>
where
IoBoxed: From<Io>,
F: IntoService<S, DispatchItem<U>>,
{
Self::with_config(io, codec, service, &DispatcherConfig::default())
}
} }
impl<S, U> DispatcherShared<S, U> impl<S, U> DispatcherShared<S, U>
@ -560,14 +525,12 @@ where
log::debug!( log::debug!(
"{}: Start keep-alive timer {:?}", "{}: Start keep-alive timer {:?}",
self.shared.io.tag(), self.shared.io.tag(),
self.cfg.keepalive_timeout_secs() self.cfg.keepalive_timeout()
); );
self.flags.insert(Flags::KA_TIMEOUT); self.flags.insert(Flags::KA_TIMEOUT);
self.shared self.shared.io.start_timer(self.cfg.keepalive_timeout());
.io
.start_timer_secs(self.cfg.keepalive_timeout_secs());
} }
} else if let Some((timeout, max, _)) = self.cfg.frame_read_rate_params() { } else if let Some((timeout, max, _)) = self.cfg.frame_read_rate() {
// we got new data but not enough to parse single frame // we got new data but not enough to parse single frame
// start read timer // start read timer
self.flags.insert(Flags::READ_TIMEOUT); self.flags.insert(Flags::READ_TIMEOUT);
@ -575,14 +538,14 @@ where
self.read_remains = decoded.remains as u32; self.read_remains = decoded.remains as u32;
self.read_remains_prev = 0; self.read_remains_prev = 0;
self.read_max_timeout = max; self.read_max_timeout = max;
self.shared.io.start_timer_secs(timeout); self.shared.io.start_timer(timeout);
} }
} }
fn handle_timeout(&mut self) -> Result<(), DispatchItem<U>> { fn handle_timeout(&mut self) -> Result<(), DispatchItem<U>> {
// check read timer // check read timer
if self.flags.contains(Flags::READ_TIMEOUT) { if self.flags.contains(Flags::READ_TIMEOUT) {
if let Some((timeout, max, rate)) = self.cfg.frame_read_rate_params() { if let Some((timeout, max, rate)) = self.cfg.frame_read_rate() {
let total = (self.read_remains - self.read_remains_prev) let total = (self.read_remains - self.read_remains_prev)
.try_into() .try_into()
.unwrap_or(u16::MAX); .unwrap_or(u16::MAX);
@ -603,7 +566,7 @@ where
self.shared.io.tag(), self.shared.io.tag(),
total total
); );
self.shared.io.start_timer_secs(timeout); self.shared.io.start_timer(timeout);
return Ok(()); return Ok(());
} }
log::trace!( log::trace!(
@ -627,12 +590,12 @@ where
mod tests { mod tests {
use rand::Rng; use rand::Rng;
use std::sync::{atomic::AtomicBool, atomic::Ordering::Relaxed, Arc, Mutex}; use std::sync::{atomic::AtomicBool, atomic::Ordering::Relaxed, Arc, Mutex};
use std::{cell::RefCell, io, time::Duration}; use std::{cell::RefCell, io};
use ntex_bytes::{Bytes, BytesMut, PoolId, PoolRef}; use ntex_bytes::{Bytes, BytesMut, PoolId, PoolRef};
use ntex_codec::BytesCodec; use ntex_codec::BytesCodec;
use ntex_service::ServiceCtx; use ntex_service::ServiceCtx;
use ntex_util::{future::Ready, time::sleep, time::Millis, time::Seconds}; use ntex_util::{time::sleep, time::Millis, time::Seconds};
use super::*; use super::*;
use crate::{io::Flags, testing::IoTest, Io, IoRef, IoStream}; use crate::{io::Flags, testing::IoTest, Io, IoRef, IoStream};
@ -713,14 +676,14 @@ mod tests {
state.set_disconnect_timeout(cfg.disconnect_timeout()); state.set_disconnect_timeout(cfg.disconnect_timeout());
state.set_tag("DBG"); state.set_tag("DBG");
let flags = if cfg.keepalive_timeout_secs().is_zero() { let flags = if cfg.keepalive_timeout().is_zero() {
super::Flags::empty() super::Flags::empty()
} else { } else {
super::Flags::KA_ENABLED super::Flags::KA_ENABLED
}; };
let inner = State(state.get_ref()); let inner = State(state.get_ref());
state.start_timer(Duration::from_millis(500)); state.start_timer(Seconds::ONE);
let shared = Rc::new(DispatcherShared { let shared = Rc::new(DispatcherShared {
codec, codec,
@ -870,19 +833,18 @@ mod tests {
impl Service<DispatchItem<BytesCodec>> for Srv { impl Service<DispatchItem<BytesCodec>> for Srv {
type Response = Option<Response<BytesCodec>>; type Response = Option<Response<BytesCodec>>;
type Error = (); type Error = ();
type Future<'f> = Ready<Option<Response<BytesCodec>>, ()>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), ()>> { fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), ()>> {
self.0.set(self.0.get() + 1); self.0.set(self.0.get() + 1);
Poll::Ready(Err(())) Poll::Ready(Err(()))
} }
fn call<'a>( async fn call<'a>(
&'a self, &'a self,
_: DispatchItem<BytesCodec>, _: DispatchItem<BytesCodec>,
_: ServiceCtx<'a, Self>, _: ServiceCtx<'a, Self>,
) -> Self::Future<'a> { ) -> Result<Self::Response, Self::Error> {
Ready::Ok(None) Ok(None)
} }
} }

View file

@ -1,10 +1,11 @@
use std::cell::Cell; use std::cell::Cell;
use std::future::{poll_fn, Future};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt, future::Future, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc}; use std::{fmt, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc};
use ntex_bytes::{PoolId, PoolRef}; use ntex_bytes::{PoolId, PoolRef};
use ntex_codec::{Decoder, Encoder}; use ntex_codec::{Decoder, Encoder};
use ntex_util::{future::poll_fn, future::Either, task::LocalWaker, time::Seconds}; use ntex_util::{future::Either, task::LocalWaker, time::Seconds};
use crate::buf::Stack; use crate::buf::Stack;
use crate::filter::{Base, Filter, Layer, NullFilter}; use crate::filter::{Base, Filter, Layer, NullFilter};

View file

@ -1,4 +1,4 @@
use std::{any, fmt, hash, io, time}; use std::{any, fmt, hash, io};
use ntex_bytes::{BytesVec, PoolRef}; use ntex_bytes::{BytesVec, PoolRef};
use ntex_codec::{Decoder, Encoder}; use ntex_codec::{Decoder, Encoder};
@ -225,23 +225,9 @@ impl IoRef {
self.0.timeout.get() self.0.timeout.get()
} }
#[doc(hidden)]
#[deprecated(since = "0.3.12")]
#[inline]
/// current timer deadline
pub fn timer_deadline(&self) -> time::Instant {
self.0.timeout.get().instant()
}
#[inline] #[inline]
/// Start timer /// Start timer
pub fn start_timer(&self, timeout: time::Duration) { pub fn start_timer(&self, timeout: Seconds) -> timer::TimerHandle {
self.start_timer_secs(Seconds(timeout.as_secs() as u16));
}
#[inline]
/// Start timer
pub fn start_timer_secs(&self, timeout: Seconds) -> timer::TimerHandle {
let cur_hnd = self.0.timeout.get(); let cur_hnd = self.0.timeout.get();
if !timeout.is_zero() { if !timeout.is_zero() {
@ -278,22 +264,6 @@ impl IoRef {
} }
} }
#[doc(hidden)]
#[deprecated(since = "0.3.6")]
#[inline]
/// Start keep-alive timer
pub fn start_keepalive_timer(&self, timeout: time::Duration) {
self.start_timer(timeout);
}
#[doc(hidden)]
#[deprecated(since = "0.3.6")]
#[inline]
/// Stop keep-alive timer
pub fn stop_keepalive_timer(&self) {
self.stop_timer()
}
#[inline] #[inline]
/// Get tag /// Get tag
pub fn tag(&self) -> &'static str { pub fn tag(&self) -> &'static str {
@ -340,11 +310,11 @@ impl fmt::Debug for IoRef {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::{future::Future, pin::Pin, rc::Rc, task::Poll}; use std::{future::poll_fn, future::Future, pin::Pin, rc::Rc, task::Poll};
use ntex_bytes::Bytes; use ntex_bytes::Bytes;
use ntex_codec::BytesCodec; use ntex_codec::BytesCodec;
use ntex_util::future::{lazy, poll_fn}; use ntex_util::future::lazy;
use ntex_util::time::{sleep, Millis}; use ntex_util::time::{sleep, Millis};
use super::*; use super::*;

View file

@ -1,10 +1,10 @@
//! utilities and helpers for testing //! utilities and helpers for testing
use std::future::{poll_fn, Future};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::task::{ready, Context, Poll, Waker}; use std::task::{ready, Context, Poll, Waker};
use std::{any, cell::RefCell, cmp, fmt, future::Future, io, mem, net, pin::Pin, rc::Rc}; use std::{any, cell::RefCell, cmp, fmt, io, mem, net, pin::Pin, rc::Rc};
use ntex_bytes::{Buf, BufMut, Bytes, BytesVec}; use ntex_bytes::{Buf, BufMut, Bytes, BytesVec};
use ntex_util::future::poll_fn;
use ntex_util::time::{sleep, Millis, Sleep}; use ntex_util::time::{sleep, Millis, Sleep};
use crate::{types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus}; use crate::{types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus};

View file

@ -66,11 +66,9 @@ where
type Error = T::Error; type Error = T::Error;
type Service = FilterService<T, F>; type Service = FilterService<T, F>;
type InitError = (); type InitError = ();
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f;
#[inline] async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
fn create(&self, _: ()) -> Self::Future<'_> { Ok(FilterService {
Ready::Ok(FilterService {
filter: self.filter.clone(), filter: self.filter.clone(),
_t: PhantomData, _t: PhantomData,
}) })
@ -96,11 +94,14 @@ where
{ {
type Response = Io<Layer<T::Filter, F>>; type Response = Io<Layer<T::Filter, F>>;
type Error = T::Error; type Error = T::Error;
type Future<'f> = T::Future where T: 'f, F: 'f;
#[inline] #[inline]
fn call<'a>(&'a self, req: Io<F>, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { async fn call(
self.filter.clone().create(req) &self,
req: Io<F>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
self.filter.clone().create(req).await
} }
} }
@ -204,11 +205,11 @@ mod tests {
assert!(NullFilter.query(std::any::TypeId::of::<()>()).is_none()); assert!(NullFilter.query(std::any::TypeId::of::<()>()).is_none());
assert!(NullFilter.shutdown(&ioref, &stack, 0).unwrap().is_ready()); assert!(NullFilter.shutdown(&ioref, &stack, 0).unwrap().is_ready());
assert_eq!( assert_eq!(
ntex_util::future::poll_fn(|cx| NullFilter.poll_read_ready(cx)).await, std::future::poll_fn(|cx| NullFilter.poll_read_ready(cx)).await,
crate::ReadStatus::Terminate crate::ReadStatus::Terminate
); );
assert_eq!( assert_eq!(
ntex_util::future::poll_fn(|cx| NullFilter.poll_write_ready(cx)).await, std::future::poll_fn(|cx| NullFilter.poll_write_ready(cx)).await,
crate::WriteStatus::Terminate crate::WriteStatus::Terminate
); );
assert!(NullFilter.process_write_buf(&ioref, &stack, 0).is_ok()); assert!(NullFilter.process_write_buf(&ioref, &stack, 0).is_ok());