diff --git a/Cargo.toml b/Cargo.toml index f6ae6aa7..6aa86081 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "ntex-rt", "ntex-service", "ntex-macros", + "ntex-util", ] [patch.crates-io] @@ -14,4 +15,5 @@ ntex-codec = { path = "ntex-codec" } ntex-router = { path = "ntex-router" } ntex-rt = { path = "ntex-rt" } ntex-service = { path = "ntex-service" } -ntex-macros = { path = "ntex-macros" } \ No newline at end of file +ntex-macros = { path = "ntex-macros" } +ntex-util = { path = "ntex-util" } \ No newline at end of file diff --git a/ntex-codec/Cargo.toml b/ntex-codec/Cargo.toml index 5c874ae2..30d9766f 100644 --- a/ntex-codec/Cargo.toml +++ b/ntex-codec/Cargo.toml @@ -18,9 +18,7 @@ path = "src/lib.rs" [dependencies] bitflags = "1.2.1" bytes = "1.0" -ntex-service = "0.1.7" -futures-core = { version = "0.3.13", default-features = false, features = ["alloc"] } -futures-sink = { version = "0.3.13", default-features = false, features = ["alloc"] } +ntex-util = "0.1.0" log = "0.4" tokio = { version = "1", default-features = false } diff --git a/ntex-codec/src/framed.rs b/ntex-codec/src/framed.rs index 8e85d53e..f0b20882 100644 --- a/ntex-codec/src/framed.rs +++ b/ntex-codec/src/framed.rs @@ -3,9 +3,7 @@ use std::task::{Context, Poll}; use std::{fmt, io}; use bytes::{Buf, BytesMut}; -use futures_core::{ready, Stream}; -use futures_sink::Sink; -use ntex_service::util::Either; +use ntex_util::{future::Either, ready, Sink, Stream}; use crate::{AsyncRead, AsyncWrite, Decoder, Encoder}; diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index 3f97de7e..03cbe92f 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -16,5 +16,5 @@ name = "ntex_rt" path = "src/lib.rs" [dependencies] -ntex-service = "0.1.7" +ntex-util = "0.1.0" tokio = { version = "1", default-features = false, features = ["rt", "net", "time", "signal", "sync"] } diff --git a/ntex-rt/src/builder.rs b/ntex-rt/src/builder.rs index fecfae4c..5d73c770 100644 --- a/ntex-rt/src/builder.rs +++ b/ntex-rt/src/builder.rs @@ -1,6 +1,6 @@ use std::{borrow::Cow, future::Future, io}; -use ntex_service::util::lazy; +use ntex_util::future::lazy; use tokio::sync::mpsc::unbounded_channel; use tokio::sync::oneshot::{channel, Receiver}; use tokio::task::LocalSet; diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index 4007f2c5..18d15d44 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -1,5 +1,5 @@ //! A runtime implementation that runs everything on the current thread. -use ntex_service::util::lazy; +use ntex_util::future::lazy; use std::future::Future; mod arbiter; diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index 2352c69c..418a2acf 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -16,6 +16,7 @@ name = "ntex_service" path = "src/lib.rs" [dependencies] +ntex-util = "0.1.0" pin-project-lite = "0.2.4" [dev-dependencies] diff --git a/ntex-service/src/and_then.rs b/ntex-service/src/and_then.rs index 07a02109..c382b207 100644 --- a/ntex-service/src/and_then.rs +++ b/ntex-service/src/and_then.rs @@ -271,8 +271,8 @@ where mod tests { use std::{cell::Cell, rc::Rc, task::Context, task::Poll}; - use crate::util::{lazy, Ready}; use crate::{fn_factory, pipeline, pipeline_factory, Service, ServiceFactory}; + use ntex_util::future::{lazy, Ready}; struct Srv1(Rc>); diff --git a/ntex-service/src/and_then_apply_fn.rs b/ntex-service/src/and_then_apply_fn.rs index fd088def..0b115df3 100644 --- a/ntex-service/src/and_then_apply_fn.rs +++ b/ntex-service/src/and_then_apply_fn.rs @@ -292,8 +292,8 @@ where mod tests { use super::*; - use crate::util::{lazy, Ready}; use crate::{fn_service, pipeline, pipeline_factory, Service, ServiceFactory}; + use ntex_util::future::{lazy, Ready}; #[derive(Clone)] struct Srv; diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index 0eafbfff..93822c4e 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -211,12 +211,11 @@ where #[cfg(test)] mod tests { + use ntex_util::future::{lazy, Ready}; use std::task::{Context, Poll}; use super::*; - use crate::{ - pipeline, pipeline_factory, util::lazy, util::Ready, Service, ServiceFactory, - }; + use crate::{pipeline, pipeline_factory, Service, ServiceFactory}; #[derive(Clone)] struct Srv; diff --git a/ntex-service/src/apply_cfg.rs b/ntex-service/src/apply_cfg.rs index 13f06d00..ed22990d 100644 --- a/ntex-service/src/apply_cfg.rs +++ b/ntex-service/src/apply_cfg.rs @@ -227,10 +227,11 @@ where #[cfg(test)] mod tests { + use ntex_util::future::Ready; use std::{cell::Cell, rc::Rc}; use super::*; - use crate::{fn_service, util::Ready, Service}; + use crate::{fn_service, Service}; #[ntex::test] async fn test_apply() { diff --git a/ntex-service/src/fn_service.rs b/ntex-service/src/fn_service.rs index 44c522bd..b26f47bf 100644 --- a/ntex-service/src/fn_service.rs +++ b/ntex-service/src/fn_service.rs @@ -1,7 +1,9 @@ use std::task::{Context, Poll}; use std::{cell::Cell, cell::RefCell, future::Future, marker::PhantomData}; -use crate::{util::Ready, IntoService, IntoServiceFactory, Service, ServiceFactory}; +use ntex_util::future::Ready; + +use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; #[inline] /// Create `ServiceFactory` for function that can act as a `Service` @@ -523,10 +525,11 @@ where #[cfg(test)] mod tests { + use ntex_util::future::lazy; use std::{rc::Rc, task::Poll}; use super::*; - use crate::{util::lazy, Service, ServiceFactory}; + use crate::{Service, ServiceFactory}; #[ntex::test] async fn test_fn_service() { diff --git a/ntex-service/src/fn_transform.rs b/ntex-service/src/fn_transform.rs index 573c7c15..b1297165 100644 --- a/ntex-service/src/fn_transform.rs +++ b/ntex-service/src/fn_transform.rs @@ -1,6 +1,7 @@ +use ntex_util::future::Ready; use std::{future::Future, marker::PhantomData}; -use crate::{apply_fn, dev::Apply, util::Ready, Service, Transform}; +use crate::{apply_fn, dev::Apply, Service, Transform}; /// Use function as transform service pub fn fn_transform( @@ -67,10 +68,11 @@ where #[cfg(test)] #[allow(clippy::redundant_clone)] mod tests { + use ntex_util::future::lazy; use std::task::{Context, Poll}; use super::*; - use crate::{util::lazy, Service}; + use crate::Service; #[derive(Clone)] struct Srv; diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index 774199ed..5a6d8a34 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -21,10 +21,6 @@ mod then; mod transform; mod transform_err; -mod either; -mod lazy; -mod ready; - pub use self::apply::{apply_fn, apply_fn_factory}; pub use self::fn_service::{ fn_factory, fn_factory_with_config, fn_mut_service, fn_service, @@ -341,12 +337,6 @@ where tp.into_service() } -pub mod util { - pub use crate::either::Either; - pub use crate::lazy::{lazy, Lazy}; - pub use crate::ready::Ready; -} - pub mod dev { pub use crate::apply::{Apply, ApplyServiceFactory}; pub use crate::fn_service::{ diff --git a/ntex-service/src/map.rs b/ntex-service/src/map.rs index 68ab19d0..3b472de3 100644 --- a/ntex-service/src/map.rs +++ b/ntex-service/src/map.rs @@ -206,8 +206,10 @@ where #[cfg(test)] mod tests { + use ntex_util::future::{lazy, Ready}; + use super::*; - use crate::{util::lazy, util::Ready, IntoServiceFactory, Service, ServiceFactory}; + use crate::{IntoServiceFactory, Service, ServiceFactory}; #[derive(Clone)] struct Srv; diff --git a/ntex-service/src/map_config.rs b/ntex-service/src/map_config.rs index 604d7c98..c48195c2 100644 --- a/ntex-service/src/map_config.rs +++ b/ntex-service/src/map_config.rs @@ -304,10 +304,11 @@ where #[cfg(test)] #[allow(clippy::redundant_closure)] mod tests { + use ntex_util::future::Ready; use std::{cell::Cell, rc::Rc}; use super::*; - use crate::{fn_factory_with_config, fn_service, util::Ready, ServiceFactory}; + use crate::{fn_factory_with_config, fn_service, ServiceFactory}; #[ntex::test] async fn test_map_config() { diff --git a/ntex-service/src/map_err.rs b/ntex-service/src/map_err.rs index c81fc06f..ed29c5b2 100644 --- a/ntex-service/src/map_err.rs +++ b/ntex-service/src/map_err.rs @@ -208,8 +208,8 @@ where #[cfg(test)] mod tests { use super::*; - use crate::util::{lazy, Ready}; use crate::{IntoServiceFactory, Service, ServiceFactory}; + use ntex_util::future::{lazy, Ready}; #[derive(Clone)] struct Srv; diff --git a/ntex-service/src/then.rs b/ntex-service/src/then.rs index 2966ef71..ac854465 100644 --- a/ntex-service/src/then.rs +++ b/ntex-service/src/then.rs @@ -256,11 +256,10 @@ where #[cfg(test)] mod tests { + use ntex_util::future::{lazy, Ready}; use std::{cell::Cell, rc::Rc, task::Context, task::Poll}; - use crate::{ - pipeline, pipeline_factory, util::lazy, util::Ready, Service, ServiceFactory, - }; + use crate::{pipeline, pipeline_factory, Service, ServiceFactory}; #[derive(Clone)] struct Srv1(Rc>); diff --git a/ntex-service/src/transform.rs b/ntex-service/src/transform.rs index c59f4dc8..b68d7c39 100644 --- a/ntex-service/src/transform.rs +++ b/ntex-service/src/transform.rs @@ -233,8 +233,10 @@ where #[cfg(test)] #[allow(clippy::redundant_clone)] mod tests { + use ntex_util::future::{lazy, Ready}; + use super::*; - use crate::{fn_service, util::lazy, util::Ready, Service, ServiceFactory}; + use crate::{fn_service, Service, ServiceFactory}; #[derive(Clone)] struct Tr; diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md new file mode 100644 index 00000000..38407c5a --- /dev/null +++ b/ntex-util/CHANGES.md @@ -0,0 +1,75 @@ +# Changes + +## [0.4.1] - 2021-04-04 + +* Use Either from ntex-service + +## [0.4.0] - 2021-02-23 + +* Migrate to tokio 1.x + +## [0.3.0] - 2021-02-20 + +* Make Encoder and Decoder methods immutable + +## [0.2.2] - 2021-01-21 + +* Flush underlying io stream + +## [0.2.1] - 2020-08-10 + +* Require `Debug` impl for `Error` + +## [0.2.0] - 2020-08-10 + +* Include custom `Encoder` and `Decoder` traits + +* Remove `From` constraint from `Encoder` and `Decoder` traits + +## [0.1.2] - 2020-04-17 + +* Do not swallow unprocessed data on read errors + +## [0.1.1] - 2020-04-07 + +* Optimize io operations + +* Fix framed close method + +## [0.1.0] - 2020-03-31 + +* Fork crate to ntex namespace + +* Use `.advance()` intead of `.split_to()` + +* Add Unpin constraint and remove unneeded unsafe + +## [0.2.0] - 2019-12-10 + +* Use specific futures dependencies + +## [0.2.0-alpha.4] + +* Fix buffer remaining capacity calcualtion + +## [0.2.0-alpha.3] + +* Use tokio 0.2 + +* Fix low/high watermark for write/read buffers + +## [0.2.0-alpha.2] + +* Migrated to `std::future` + +## [0.1.2] - 2019-03-27 + +* Added `Framed::map_io()` method. + +## [0.1.1] - 2019-03-06 + +* Added `FramedParts::with_read_buffer()` method. + +## [0.1.0] - 2018-12-09 + +* Move codec to separate crate diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml new file mode 100644 index 00000000..dc0cd08e --- /dev/null +++ b/ntex-util/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "ntex-util" +version = "0.1.0" +authors = ["ntex contributors "] +description = "Utilities for ntex framework" +keywords = ["network", "framework", "async", "futures"] +homepage = "https://ntex.rs" +repository = "https://github.com/ntex-rs/ntex.git" +documentation = "https://docs.rs/ntex-util/" +categories = ["network-programming", "asynchronous"] +license = "MIT" +edition = "2018" + +[lib] +name = "ntex_util" +path = "src/lib.rs" + +[dependencies] +bitflags = "1.2" +slab = "0.4" +futures-core = { version = "0.3.13", default-features = false, features = ["alloc"] } +futures-sink = { version = "0.3.13", default-features = false, features = ["alloc"] } +pin-project-lite = "0.2.6" + +[dev-dependencies] +ntex = "0.3.14" diff --git a/ntex-util/LICENSE b/ntex-util/LICENSE new file mode 120000 index 00000000..ea5b6064 --- /dev/null +++ b/ntex-util/LICENSE @@ -0,0 +1 @@ +../LICENSE \ No newline at end of file diff --git a/ntex-util/src/channel/cell.rs b/ntex-util/src/channel/cell.rs new file mode 100644 index 00000000..ef6ecc60 --- /dev/null +++ b/ntex-util/src/channel/cell.rs @@ -0,0 +1,42 @@ +//! Custom cell impl + +use std::{cell::UnsafeCell, fmt, rc::Rc}; + +pub(super) struct Cell { + inner: Rc>, +} + +impl Clone for Cell { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl fmt::Debug for Cell { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} + +impl Cell { + pub(super) fn new(inner: T) -> Self { + Self { + inner: Rc::new(UnsafeCell::new(inner)), + } + } + + pub(super) fn strong_count(&self) -> usize { + Rc::strong_count(&self.inner) + } + + pub(super) fn get_ref(&self) -> &T { + unsafe { &*self.inner.as_ref().get() } + } + + #[allow(clippy::mut_from_ref)] + pub(super) fn get_mut(&self) -> &mut T { + unsafe { &mut *self.inner.as_ref().get() } + } +} diff --git a/ntex/src/channel/condition.rs b/ntex-util/src/channel/condition.rs similarity index 84% rename from ntex/src/channel/condition.rs rename to ntex-util/src/channel/condition.rs index e1f0537a..56413686 100644 --- a/ntex/src/channel/condition.rs +++ b/ntex-util/src/channel/condition.rs @@ -1,6 +1,5 @@ -use std::{future::Future, pin::Pin, task::Context, task::Poll}; - use slab::Slab; +use std::{future::Future, pin::Pin, task::Context, task::Poll}; use super::cell::Cell; use crate::task::LocalWaker; @@ -69,12 +68,6 @@ impl Waiter { } Poll::Pending } - - #[doc(hidden)] - #[deprecated(since = "0.3.0")] - pub fn poll_waiter(&self, cx: &mut Context<'_>) -> Poll<()> { - self.poll_ready(cx) - } } impl Clone for Waiter { @@ -106,7 +99,7 @@ mod tests { use super::*; use crate::util::lazy; - #[crate::rt_test] + #[ntex::test] #[allow(clippy::unit_cmp)] async fn test_condition() { let cond = Condition::new(); @@ -134,8 +127,7 @@ mod tests { assert_eq!(waiter2.await, ()); } - #[crate::rt_test] - #[allow(deprecated)] + #[ntex::test] async fn test_condition_poll() { let cond = Condition::new(); let waiter = cond.wait(); @@ -143,13 +135,8 @@ mod tests { cond.notify(); assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(())); - let waiter = cond.wait(); - assert_eq!(lazy(|cx| waiter.poll_waiter(cx)).await, Poll::Pending); - let waiter2 = waiter.clone(); - assert_eq!(lazy(|cx| waiter2.poll_waiter(cx)).await, Poll::Pending); - drop(cond); - assert_eq!(lazy(|cx| waiter.poll_waiter(cx)).await, Poll::Ready(())); - assert_eq!(lazy(|cx| waiter2.poll_waiter(cx)).await, Poll::Ready(())); + assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(())); + assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(())); } } diff --git a/ntex-util/src/channel/mod.rs b/ntex-util/src/channel/mod.rs new file mode 100644 index 00000000..79b21749 --- /dev/null +++ b/ntex-util/src/channel/mod.rs @@ -0,0 +1,19 @@ +//! Communication primitives + +mod cell; +pub mod condition; +pub mod oneshot; +pub mod pool; + +/// Error returned from a [`Receiver`](Receiver) when the corresponding +/// [`Sender`](Sender) is dropped. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct Canceled; + +impl std::fmt::Display for Canceled { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::write!(f, "oneshot canceled") + } +} + +impl std::error::Error for Canceled {} diff --git a/ntex/src/channel/oneshot.rs b/ntex-util/src/channel/oneshot.rs similarity index 100% rename from ntex/src/channel/oneshot.rs rename to ntex-util/src/channel/oneshot.rs diff --git a/ntex/src/channel/pool.rs b/ntex-util/src/channel/pool.rs similarity index 99% rename from ntex/src/channel/pool.rs rename to ntex-util/src/channel/pool.rs index 28750b5e..59b40353 100644 --- a/ntex/src/channel/pool.rs +++ b/ntex-util/src/channel/pool.rs @@ -1,7 +1,6 @@ //! A one-shot pool, futures-aware channel. -use std::{future::Future, pin::Pin, task::Context, task::Poll}; - use slab::Slab; +use std::{future::Future, pin::Pin, task::Context, task::Poll}; use super::{cell::Cell, Canceled}; use crate::task::LocalWaker; diff --git a/ntex-service/src/either.rs b/ntex-util/src/future/either.rs similarity index 100% rename from ntex-service/src/either.rs rename to ntex-util/src/future/either.rs diff --git a/ntex-util/src/future/join.rs b/ntex-util/src/future/join.rs new file mode 100644 index 00000000..502db58b --- /dev/null +++ b/ntex-util/src/future/join.rs @@ -0,0 +1,101 @@ +use std::{future::Future, mem, pin::Pin, task::Context, task::Poll}; + +use crate::future::MaybeDone; + +/// Future for the `join` combinator, waiting for two futures to complete. +pub async fn join(fut_a: A, fut_b: B) -> (A::Output, B::Output) +where + A: Future, + B: Future, +{ + Join { + fut_a: MaybeDone::Pending(fut_a), + fut_b: MaybeDone::Pending(fut_b), + } + .await +} + +pin_project_lite::pin_project! { + pub(crate) struct Join { + #[pin] + fut_a: MaybeDone, + #[pin] + fut_b: MaybeDone, + } +} + +impl Future for Join +where + A: Future, + B: Future, +{ + type Output = (A::Output, B::Output); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let mut all_done = true; + all_done &= this.fut_a.as_mut().poll(cx).is_ready(); + all_done &= this.fut_b.as_mut().poll(cx).is_ready(); + if all_done { + Poll::Ready(( + this.fut_a.take_output().unwrap(), + this.fut_b.take_output().unwrap(), + )) + } else { + Poll::Pending + } + } +} + +/// Creates a future which represents a collection of the outputs of the futures +/// given. +pub async fn join_all(i: I) -> Vec<::Output> +where + I: IntoIterator, + I::Item: Future, +{ + let elems: Box<[_]> = i.into_iter().map(MaybeDone::Pending).collect(); + JoinAll { + elems: elems.into(), + } + .await +} + +pub(crate) struct JoinAll { + elems: Pin]>>, +} + +fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { + // Safety: `std` _could_ make this unsound if it were to decide Pin's + // invariants aren't required to transmit through slices. Otherwise this has + // the same safety as a normal field pin projection. + unsafe { slice.get_unchecked_mut() } + .iter_mut() + .map(|t| unsafe { Pin::new_unchecked(t) }) +} + +impl Future for JoinAll +where + T: Future, +{ + type Output = Vec; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut all_done = true; + + for elem in iter_pin_mut(self.elems.as_mut()) { + if elem.poll(cx).is_pending() { + all_done = false; + } + } + if all_done { + let mut elems = mem::replace(&mut self.elems, Box::pin([])); + let result = iter_pin_mut(elems.as_mut()) + .map(|e| e.take_output().unwrap()) + .collect(); + Poll::Ready(result) + } else { + Poll::Pending + } + } +} diff --git a/ntex-service/src/lazy.rs b/ntex-util/src/future/lazy.rs similarity index 100% rename from ntex-service/src/lazy.rs rename to ntex-util/src/future/lazy.rs diff --git a/ntex-util/src/future/mod.rs b/ntex-util/src/future/mod.rs new file mode 100644 index 00000000..4233d79a --- /dev/null +++ b/ntex-util/src/future/mod.rs @@ -0,0 +1,107 @@ +//! Utilities for futures +use std::{future::Future, mem, pin::Pin, task::Context, task::Poll}; + +pub use futures_core::Stream; +pub use futures_sink::Sink; + +mod either; +mod join; +mod lazy; +mod ready; +mod select; + +pub use self::either::Either; +pub use self::join::{join, join_all}; +pub use self::lazy::{lazy, Lazy}; +pub use self::ready::Ready; +pub use self::select::select; + +/// Creates a new future wrapping around a function returning [`Poll`]. +/// +/// Polling the returned future delegates to the wrapped function. +pub fn poll_fn(f: F) -> impl Future +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + PollFn { f } +} + +/// Future for the [`poll_fn`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +struct PollFn { + f: F, +} + +impl Unpin for PollFn {} + +impl Future for PollFn +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + (&mut self.f)(cx) + } +} + +/// Creates a future that resolves to the next item in the stream. +pub async fn next(stream: &mut S) -> Option +where + S: Stream + Unpin, +{ + poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await +} + +/// A future that completes after the given item has been fully processed +/// into the sink, including flushing. +pub async fn send(sink: &mut S, item: I) -> Result<(), S::Error> +where + S: Sink + Unpin, +{ + poll_fn(|cx| Pin::new(&mut *sink).poll_ready(cx)).await?; + Pin::new(&mut *sink).start_send(item)?; + poll_fn(|cx| Pin::new(&mut *sink).poll_flush(cx)).await +} + +enum MaybeDone +where + F: Future, +{ + Pending(F), + Done(F::Output), + Gone, +} + +impl MaybeDone { + fn take_output(self: Pin<&mut Self>) -> Option { + match &*self { + Self::Done(_) => {} + Self::Pending(_) | Self::Gone => return None, + } + unsafe { + match mem::replace(self.get_unchecked_mut(), Self::Gone) { + MaybeDone::Done(output) => Some(output), + _ => unreachable!(), + } + } + } +} + +impl Future for MaybeDone { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { + match self.as_mut().get_unchecked_mut() { + MaybeDone::Pending(f) => { + let res = futures_core::ready!(Pin::new_unchecked(f).poll(cx)); + self.set(Self::Done(res)); + } + MaybeDone::Done(_) => {} + MaybeDone::Gone => panic!("MaybeDone polled after value taken"), + } + } + Poll::Ready(()) + } +} diff --git a/ntex-service/src/ready.rs b/ntex-util/src/future/ready.rs similarity index 100% rename from ntex-service/src/ready.rs rename to ntex-util/src/future/ready.rs diff --git a/ntex-util/src/future/select.rs b/ntex-util/src/future/select.rs new file mode 100644 index 00000000..c3e70290 --- /dev/null +++ b/ntex-util/src/future/select.rs @@ -0,0 +1,43 @@ +use std::{future::Future, pin::Pin, task::Context, task::Poll}; + +use crate::future::Either; + +/// Waits for either one of two differently-typed futures to complete. +pub async fn select(fut_a: A, fut_b: B) -> Either +where + A: Future, + B: Future, +{ + Select { fut_a, fut_b }.await +} + +pin_project_lite::pin_project! { + pub(crate) struct Select { + #[pin] + fut_a: A, + #[pin] + fut_b: B, + } +} + +impl Future for Select +where + A: Future, + B: Future, +{ + type Output = Either; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + if let Poll::Ready(item) = this.fut_a.poll(cx) { + return Poll::Ready(Either::Left(item)); + } + + if let Poll::Ready(item) = this.fut_b.poll(cx) { + return Poll::Ready(Either::Right(item)); + } + + Poll::Pending + } +} diff --git a/ntex-util/src/lib.rs b/ntex-util/src/lib.rs new file mode 100644 index 00000000..9b7b34e7 --- /dev/null +++ b/ntex-util/src/lib.rs @@ -0,0 +1,7 @@ +//! Utilities for ntex framework +pub mod channel; +pub mod future; +pub mod task; + +pub use futures_core::{ready, Stream}; +pub use futures_sink::Sink; diff --git a/ntex/src/task.rs b/ntex-util/src/task.rs similarity index 100% rename from ntex/src/task.rs rename to ntex-util/src/task.rs diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 4e2f17d4..5d81bf7c 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -44,6 +44,7 @@ ntex-rt = "0.2.2" ntex-router = "0.4.2" ntex-service = "0.1.7" ntex-macros = "0.1.3" +ntex-util = "0.1.0" base64 = "0.13" bitflags = "1.2" diff --git a/ntex/src/channel/mod.rs b/ntex/src/channel/mod.rs index 243aea2a..0cdd95ea 100644 --- a/ntex/src/channel/mod.rs +++ b/ntex/src/channel/mod.rs @@ -1,20 +1,5 @@ //! Communication primitives mod cell; -pub mod condition; pub mod mpsc; -pub mod oneshot; -pub mod pool; - -/// Error returned from a [`Receiver`](Receiver) when the corresponding -/// [`Sender`](Sender) is dropped. -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -pub struct Canceled; - -impl std::fmt::Display for Canceled { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::write!(f, "oneshot canceled") - } -} - -impl std::error::Error for Canceled {} +pub use ntex_util::channel::*; diff --git a/ntex/src/lib.rs b/ntex/src/lib.rs index 826d7c50..b9fc2996 100644 --- a/ntex/src/lib.rs +++ b/ntex/src/lib.rs @@ -38,7 +38,6 @@ pub mod connect; pub mod framed; pub mod http; pub mod server; -pub mod task; pub mod testing; pub mod util; pub mod web; @@ -48,6 +47,7 @@ pub use self::service::*; pub use futures_core::stream::Stream; pub use futures_sink::Sink; +pub use ntex_util::task; pub mod codec { //! Utilities for encoding and decoding frames. diff --git a/ntex/src/util/mod.rs b/ntex/src/util/mod.rs index 9a2dfefe..65cb7a8d 100644 --- a/ntex/src/util/mod.rs +++ b/ntex/src/util/mod.rs @@ -1,5 +1,3 @@ -use std::{future::Future, pin::Pin, task::Context, task::Poll}; - pub mod buffer; pub mod counter; mod extensions; @@ -13,162 +11,9 @@ pub mod variant; pub use self::extensions::Extensions; -pub use ntex_service::util::{lazy, Either, Lazy, Ready}; - pub use bytes::{Buf, BufMut, Bytes, BytesMut}; pub use bytestring::ByteString; +pub use ntex_util::future::*; pub type HashMap = std::collections::HashMap; pub type HashSet = std::collections::HashSet; - -/// Creates a new future wrapping around a function returning [`Poll`]. -/// -/// Polling the returned future delegates to the wrapped function. -pub fn poll_fn(f: F) -> impl Future -where - F: FnMut(&mut Context<'_>) -> Poll, -{ - PollFn { f } -} - -/// Future for the [`poll_fn`] function. -#[must_use = "futures do nothing unless you `.await` or poll them"] -struct PollFn { - f: F, -} - -impl Unpin for PollFn {} - -impl Future for PollFn -where - F: FnMut(&mut Context<'_>) -> Poll, -{ - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - (&mut self.f)(cx) - } -} - -/// Creates a future that resolves to the next item in the stream. -pub async fn next(stream: &mut S) -> Option -where - S: crate::Stream + Unpin, -{ - poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await -} - -/// A future that completes after the given item has been fully processed -/// into the sink, including flushing. -pub async fn send(sink: &mut S, item: I) -> Result<(), S::Error> -where - S: crate::Sink + Unpin, -{ - poll_fn(|cx| Pin::new(&mut *sink).poll_ready(cx)).await?; - Pin::new(&mut *sink).start_send(item)?; - poll_fn(|cx| Pin::new(&mut *sink).poll_flush(cx)).await -} - -/// Future for the `join` combinator, waiting for two futures to -/// complete. -pub async fn join(fut_a: A, fut_b: B) -> (A::Output, B::Output) -where - A: Future, - B: Future, -{ - tokio::pin!(fut_a); - tokio::pin!(fut_b); - - let mut res_a = None; - let mut res_b = None; - - poll_fn(|cx| { - if res_a.is_none() { - if let Poll::Ready(item) = Pin::new(&mut fut_a).poll(cx) { - res_a = Some(item) - } - } - if res_b.is_none() { - if let Poll::Ready(item) = Pin::new(&mut fut_b).poll(cx) { - res_b = Some(item) - } - } - if res_a.is_some() && res_b.is_some() { - Poll::Ready(()) - } else { - Poll::Pending - } - }) - .await; - - (res_a.unwrap(), res_b.unwrap()) -} - -/// Waits for either one of two differently-typed futures to complete. -pub async fn select(fut_a: A, fut_b: B) -> Either -where - A: Future, - B: Future, -{ - tokio::pin!(fut_a); - tokio::pin!(fut_b); - - poll_fn(|cx| { - if let Poll::Ready(item) = Pin::new(&mut fut_a).poll(cx) { - Poll::Ready(Either::Left(item)) - } else if let Poll::Ready(item) = Pin::new(&mut fut_b).poll(cx) { - Poll::Ready(Either::Right(item)) - } else { - Poll::Pending - } - }) - .await -} - -enum MaybeDone -where - T: Future, -{ - Pending(T), - Done(T::Output), -} - -/// Creates a future which represents a collection of the outputs of the futures given. -pub async fn join_all(fut: Vec) -> Vec -where - F: Future, -{ - let mut futs: Vec<_> = fut - .into_iter() - .map(|f| MaybeDone::Pending(Box::pin(f))) - .collect(); - - poll_fn(|cx| { - let mut pending = false; - for item in &mut futs { - if let MaybeDone::Pending(ref mut fut) = item { - if let Poll::Ready(res) = fut.as_mut().poll(cx) { - *item = MaybeDone::Done(res); - } else { - pending = true; - } - } - } - if pending { - Poll::Pending - } else { - Poll::Ready(()) - } - }) - .await; - - futs.into_iter() - .map(|item| { - if let MaybeDone::Done(item) = item { - item - } else { - unreachable!() - } - }) - .collect() -}