From 60620d4587ac3c0787867888fd399f406efed2e7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 7 Jan 2024 04:35:30 +0600 Subject: [PATCH] Migrate ntex-io to async fn in trait --- ntex-io/CHANGES.md | 4 +++ ntex-io/Cargo.toml | 6 ++-- ntex-io/src/dispatcher.rs | 74 ++++++++++----------------------------- ntex-io/src/io.rs | 5 +-- ntex-io/src/ioref.rs | 38 +++----------------- ntex-io/src/testing.rs | 4 +-- ntex-io/src/utils.rs | 19 +++++----- 7 files changed, 44 insertions(+), 106 deletions(-) diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index b021e873..7ec9b44f 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.0.0] - 2024-01-0x + +* Use "async fn" in trait for Service definition + ## [0.3.17] - 2023-12-25 * Fix filter leak during Io drop diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 03642ca9..25a4ad7a 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.3.17" +version = "1.0.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -18,8 +18,8 @@ path = "src/lib.rs" [dependencies] ntex-codec = "0.6.2" ntex-bytes = "0.1.21" -ntex-util = "0.3.4" -ntex-service = "1.2.7" +ntex-util = "1.0.0" +ntex-service = "2.0.0" bitflags = "2.4" log = "0.4" diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 4f08f398..f8a786b2 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -1,5 +1,5 @@ //! Framed transport dispatcher -use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll, time}; +use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll}; use ntex_bytes::Pool; use ntex_codec::{Decoder, Encoder}; @@ -38,17 +38,9 @@ impl Default for DispatcherConfig { } impl DispatcherConfig { - #[doc(hidden)] - #[deprecated(since = "0.3.12")] #[inline] /// Get keep-alive timeout - pub fn keepalive_timeout(&self) -> time::Duration { - self.0.keepalive_timeout.get().into() - } - - #[inline] - /// Get keep-alive timeout - pub fn keepalive_timeout_secs(&self) -> Seconds { + pub fn keepalive_timeout(&self) -> Seconds { self.0.keepalive_timeout.get() } @@ -58,25 +50,9 @@ impl DispatcherConfig { self.0.disconnect_timeout.get() } - #[doc(hidden)] - #[deprecated(since = "0.3.12")] #[inline] /// Get frame read rate - pub fn frame_read_rate(&self) -> Option<(time::Duration, time::Duration, u16)> { - if self.0.frame_read_enabled.get() { - Some(( - self.0.frame_read_timeout.get().into(), - self.0.frame_read_max_timeout.get().into(), - self.0.frame_read_rate.get(), - )) - } else { - None - } - } - - #[inline] - /// Get frame read rate - pub fn frame_read_rate_params(&self) -> Option<(Seconds, Seconds, u16)> { + pub fn frame_read_rate(&self) -> Option<(Seconds, Seconds, u16)> { if self.0.frame_read_enabled.get() { Some(( self.0.frame_read_timeout.get(), @@ -219,7 +195,7 @@ where U: Decoder + Encoder, { /// Construct new `Dispatcher` instance. - pub fn with_config( + pub fn new( io: Io, codec: U, service: F, @@ -232,7 +208,7 @@ where let io = IoBoxed::from(io); io.set_disconnect_timeout(cfg.disconnect_timeout()); - let flags = if cfg.keepalive_timeout_secs().is_zero() { + let flags = if cfg.keepalive_timeout().is_zero() { Flags::empty() } else { Flags::KA_ENABLED @@ -261,17 +237,6 @@ where }, } } - - #[doc(hidden)] - #[deprecated(since = "0.3.6", note = "Use Dispatcher::with_config() method")] - /// Construct new `Dispatcher` instance. - pub fn new(io: Io, codec: U, service: F) -> Dispatcher - where - IoBoxed: From, - F: IntoService>, - { - Self::with_config(io, codec, service, &DispatcherConfig::default()) - } } impl DispatcherShared @@ -560,14 +525,12 @@ where log::debug!( "{}: Start keep-alive timer {:?}", self.shared.io.tag(), - self.cfg.keepalive_timeout_secs() + self.cfg.keepalive_timeout() ); self.flags.insert(Flags::KA_TIMEOUT); - self.shared - .io - .start_timer_secs(self.cfg.keepalive_timeout_secs()); + self.shared.io.start_timer(self.cfg.keepalive_timeout()); } - } else if let Some((timeout, max, _)) = self.cfg.frame_read_rate_params() { + } else if let Some((timeout, max, _)) = self.cfg.frame_read_rate() { // we got new data but not enough to parse single frame // start read timer self.flags.insert(Flags::READ_TIMEOUT); @@ -575,14 +538,14 @@ where self.read_remains = decoded.remains as u32; self.read_remains_prev = 0; self.read_max_timeout = max; - self.shared.io.start_timer_secs(timeout); + self.shared.io.start_timer(timeout); } } fn handle_timeout(&mut self) -> Result<(), DispatchItem> { // check read timer if self.flags.contains(Flags::READ_TIMEOUT) { - if let Some((timeout, max, rate)) = self.cfg.frame_read_rate_params() { + if let Some((timeout, max, rate)) = self.cfg.frame_read_rate() { let total = (self.read_remains - self.read_remains_prev) .try_into() .unwrap_or(u16::MAX); @@ -603,7 +566,7 @@ where self.shared.io.tag(), total ); - self.shared.io.start_timer_secs(timeout); + self.shared.io.start_timer(timeout); return Ok(()); } log::trace!( @@ -627,12 +590,12 @@ where mod tests { use rand::Rng; use std::sync::{atomic::AtomicBool, atomic::Ordering::Relaxed, Arc, Mutex}; - use std::{cell::RefCell, io, time::Duration}; + use std::{cell::RefCell, io}; use ntex_bytes::{Bytes, BytesMut, PoolId, PoolRef}; use ntex_codec::BytesCodec; use ntex_service::ServiceCtx; - use ntex_util::{future::Ready, time::sleep, time::Millis, time::Seconds}; + use ntex_util::{time::sleep, time::Millis, time::Seconds}; use super::*; use crate::{io::Flags, testing::IoTest, Io, IoRef, IoStream}; @@ -713,14 +676,14 @@ mod tests { state.set_disconnect_timeout(cfg.disconnect_timeout()); state.set_tag("DBG"); - let flags = if cfg.keepalive_timeout_secs().is_zero() { + let flags = if cfg.keepalive_timeout().is_zero() { super::Flags::empty() } else { super::Flags::KA_ENABLED }; let inner = State(state.get_ref()); - state.start_timer(Duration::from_millis(500)); + state.start_timer(Seconds::ONE); let shared = Rc::new(DispatcherShared { codec, @@ -870,19 +833,18 @@ mod tests { impl Service> for Srv { type Response = Option>; type Error = (); - type Future<'f> = Ready>, ()>; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { self.0.set(self.0.get() + 1); Poll::Ready(Err(())) } - fn call<'a>( + async fn call<'a>( &'a self, _: DispatchItem, _: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { - Ready::Ok(None) + ) -> Result { + Ok(None) } } diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 54a3ad80..28ada462 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -1,10 +1,11 @@ use std::cell::Cell; +use std::future::{poll_fn, Future}; use std::task::{Context, Poll}; -use std::{fmt, future::Future, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc}; +use std::{fmt, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc}; use ntex_bytes::{PoolId, PoolRef}; use ntex_codec::{Decoder, Encoder}; -use ntex_util::{future::poll_fn, future::Either, task::LocalWaker, time::Seconds}; +use ntex_util::{future::Either, task::LocalWaker, time::Seconds}; use crate::buf::Stack; use crate::filter::{Base, Filter, Layer, NullFilter}; diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index 9da15255..7d33a952 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -1,4 +1,4 @@ -use std::{any, fmt, hash, io, time}; +use std::{any, fmt, hash, io}; use ntex_bytes::{BytesVec, PoolRef}; use ntex_codec::{Decoder, Encoder}; @@ -225,23 +225,9 @@ impl IoRef { self.0.timeout.get() } - #[doc(hidden)] - #[deprecated(since = "0.3.12")] - #[inline] - /// current timer deadline - pub fn timer_deadline(&self) -> time::Instant { - self.0.timeout.get().instant() - } - #[inline] /// Start timer - pub fn start_timer(&self, timeout: time::Duration) { - self.start_timer_secs(Seconds(timeout.as_secs() as u16)); - } - - #[inline] - /// Start timer - pub fn start_timer_secs(&self, timeout: Seconds) -> timer::TimerHandle { + pub fn start_timer(&self, timeout: Seconds) -> timer::TimerHandle { let cur_hnd = self.0.timeout.get(); if !timeout.is_zero() { @@ -278,22 +264,6 @@ impl IoRef { } } - #[doc(hidden)] - #[deprecated(since = "0.3.6")] - #[inline] - /// Start keep-alive timer - pub fn start_keepalive_timer(&self, timeout: time::Duration) { - self.start_timer(timeout); - } - - #[doc(hidden)] - #[deprecated(since = "0.3.6")] - #[inline] - /// Stop keep-alive timer - pub fn stop_keepalive_timer(&self) { - self.stop_timer() - } - #[inline] /// Get tag pub fn tag(&self) -> &'static str { @@ -340,11 +310,11 @@ impl fmt::Debug for IoRef { #[cfg(test)] mod tests { use std::cell::{Cell, RefCell}; - use std::{future::Future, pin::Pin, rc::Rc, task::Poll}; + use std::{future::poll_fn, future::Future, pin::Pin, rc::Rc, task::Poll}; use ntex_bytes::Bytes; use ntex_codec::BytesCodec; - use ntex_util::future::{lazy, poll_fn}; + use ntex_util::future::lazy; use ntex_util::time::{sleep, Millis}; use super::*; diff --git a/ntex-io/src/testing.rs b/ntex-io/src/testing.rs index 01494852..590d6568 100644 --- a/ntex-io/src/testing.rs +++ b/ntex-io/src/testing.rs @@ -1,10 +1,10 @@ //! utilities and helpers for testing +use std::future::{poll_fn, Future}; use std::sync::{Arc, Mutex}; use std::task::{ready, Context, Poll, Waker}; -use std::{any, cell::RefCell, cmp, fmt, future::Future, io, mem, net, pin::Pin, rc::Rc}; +use std::{any, cell::RefCell, cmp, fmt, io, mem, net, pin::Pin, rc::Rc}; use ntex_bytes::{Buf, BufMut, Bytes, BytesVec}; -use ntex_util::future::poll_fn; use ntex_util::time::{sleep, Millis, Sleep}; use crate::{types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus}; diff --git a/ntex-io/src/utils.rs b/ntex-io/src/utils.rs index 9fbdacec..3a959022 100644 --- a/ntex-io/src/utils.rs +++ b/ntex-io/src/utils.rs @@ -66,11 +66,9 @@ where type Error = T::Error; type Service = FilterService; type InitError = (); - type Future<'f> = Ready where Self: 'f; - #[inline] - fn create(&self, _: ()) -> Self::Future<'_> { - Ready::Ok(FilterService { + async fn create(&self, _: ()) -> Result { + Ok(FilterService { filter: self.filter.clone(), _t: PhantomData, }) @@ -96,11 +94,14 @@ where { type Response = Io>; type Error = T::Error; - type Future<'f> = T::Future where T: 'f, F: 'f; #[inline] - fn call<'a>(&'a self, req: Io, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - self.filter.clone().create(req) + async fn call( + &self, + req: Io, + _: ServiceCtx<'_, Self>, + ) -> Result { + self.filter.clone().create(req).await } } @@ -204,11 +205,11 @@ mod tests { assert!(NullFilter.query(std::any::TypeId::of::<()>()).is_none()); assert!(NullFilter.shutdown(&ioref, &stack, 0).unwrap().is_ready()); assert_eq!( - ntex_util::future::poll_fn(|cx| NullFilter.poll_read_ready(cx)).await, + std::future::poll_fn(|cx| NullFilter.poll_read_ready(cx)).await, crate::ReadStatus::Terminate ); assert_eq!( - ntex_util::future::poll_fn(|cx| NullFilter.poll_write_ready(cx)).await, + std::future::poll_fn(|cx| NullFilter.poll_write_ready(cx)).await, crate::WriteStatus::Terminate ); assert!(NullFilter.process_write_buf(&ioref, &stack, 0).is_ok());