move low level utils to separate crate

This commit is contained in:
Nikolay Kim 2021-04-04 01:20:18 +06:00
parent 8b3655457b
commit 91ee6762e7
39 changed files with 466 additions and 230 deletions

View file

@ -6,6 +6,7 @@ members = [
"ntex-rt",
"ntex-service",
"ntex-macros",
"ntex-util",
]
[patch.crates-io]
@ -14,4 +15,5 @@ ntex-codec = { path = "ntex-codec" }
ntex-router = { path = "ntex-router" }
ntex-rt = { path = "ntex-rt" }
ntex-service = { path = "ntex-service" }
ntex-macros = { path = "ntex-macros" }
ntex-macros = { path = "ntex-macros" }
ntex-util = { path = "ntex-util" }

View file

@ -18,9 +18,7 @@ path = "src/lib.rs"
[dependencies]
bitflags = "1.2.1"
bytes = "1.0"
ntex-service = "0.1.7"
futures-core = { version = "0.3.13", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.13", default-features = false, features = ["alloc"] }
ntex-util = "0.1.0"
log = "0.4"
tokio = { version = "1", default-features = false }

View file

@ -3,9 +3,7 @@ use std::task::{Context, Poll};
use std::{fmt, io};
use bytes::{Buf, BytesMut};
use futures_core::{ready, Stream};
use futures_sink::Sink;
use ntex_service::util::Either;
use ntex_util::{future::Either, ready, Sink, Stream};
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder};

View file

@ -16,5 +16,5 @@ name = "ntex_rt"
path = "src/lib.rs"
[dependencies]
ntex-service = "0.1.7"
ntex-util = "0.1.0"
tokio = { version = "1", default-features = false, features = ["rt", "net", "time", "signal", "sync"] }

View file

@ -1,6 +1,6 @@
use std::{borrow::Cow, future::Future, io};
use ntex_service::util::lazy;
use ntex_util::future::lazy;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::oneshot::{channel, Receiver};
use tokio::task::LocalSet;

View file

@ -1,5 +1,5 @@
//! A runtime implementation that runs everything on the current thread.
use ntex_service::util::lazy;
use ntex_util::future::lazy;
use std::future::Future;
mod arbiter;

View file

@ -16,6 +16,7 @@ name = "ntex_service"
path = "src/lib.rs"
[dependencies]
ntex-util = "0.1.0"
pin-project-lite = "0.2.4"
[dev-dependencies]

View file

@ -271,8 +271,8 @@ where
mod tests {
use std::{cell::Cell, rc::Rc, task::Context, task::Poll};
use crate::util::{lazy, Ready};
use crate::{fn_factory, pipeline, pipeline_factory, Service, ServiceFactory};
use ntex_util::future::{lazy, Ready};
struct Srv1(Rc<Cell<usize>>);

View file

@ -292,8 +292,8 @@ where
mod tests {
use super::*;
use crate::util::{lazy, Ready};
use crate::{fn_service, pipeline, pipeline_factory, Service, ServiceFactory};
use ntex_util::future::{lazy, Ready};
#[derive(Clone)]
struct Srv;

View file

@ -211,12 +211,11 @@ where
#[cfg(test)]
mod tests {
use ntex_util::future::{lazy, Ready};
use std::task::{Context, Poll};
use super::*;
use crate::{
pipeline, pipeline_factory, util::lazy, util::Ready, Service, ServiceFactory,
};
use crate::{pipeline, pipeline_factory, Service, ServiceFactory};
#[derive(Clone)]
struct Srv;

View file

@ -227,10 +227,11 @@ where
#[cfg(test)]
mod tests {
use ntex_util::future::Ready;
use std::{cell::Cell, rc::Rc};
use super::*;
use crate::{fn_service, util::Ready, Service};
use crate::{fn_service, Service};
#[ntex::test]
async fn test_apply() {

View file

@ -1,7 +1,9 @@
use std::task::{Context, Poll};
use std::{cell::Cell, cell::RefCell, future::Future, marker::PhantomData};
use crate::{util::Ready, IntoService, IntoServiceFactory, Service, ServiceFactory};
use ntex_util::future::Ready;
use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory};
#[inline]
/// Create `ServiceFactory` for function that can act as a `Service`
@ -523,10 +525,11 @@ where
#[cfg(test)]
mod tests {
use ntex_util::future::lazy;
use std::{rc::Rc, task::Poll};
use super::*;
use crate::{util::lazy, Service, ServiceFactory};
use crate::{Service, ServiceFactory};
#[ntex::test]
async fn test_fn_service() {

View file

@ -1,6 +1,7 @@
use ntex_util::future::Ready;
use std::{future::Future, marker::PhantomData};
use crate::{apply_fn, dev::Apply, util::Ready, Service, Transform};
use crate::{apply_fn, dev::Apply, Service, Transform};
/// Use function as transform service
pub fn fn_transform<S, F, R, Req, Res, Err>(
@ -67,10 +68,11 @@ where
#[cfg(test)]
#[allow(clippy::redundant_clone)]
mod tests {
use ntex_util::future::lazy;
use std::task::{Context, Poll};
use super::*;
use crate::{util::lazy, Service};
use crate::Service;
#[derive(Clone)]
struct Srv;

View file

@ -21,10 +21,6 @@ mod then;
mod transform;
mod transform_err;
mod either;
mod lazy;
mod ready;
pub use self::apply::{apply_fn, apply_fn_factory};
pub use self::fn_service::{
fn_factory, fn_factory_with_config, fn_mut_service, fn_service,
@ -341,12 +337,6 @@ where
tp.into_service()
}
pub mod util {
pub use crate::either::Either;
pub use crate::lazy::{lazy, Lazy};
pub use crate::ready::Ready;
}
pub mod dev {
pub use crate::apply::{Apply, ApplyServiceFactory};
pub use crate::fn_service::{

View file

@ -206,8 +206,10 @@ where
#[cfg(test)]
mod tests {
use ntex_util::future::{lazy, Ready};
use super::*;
use crate::{util::lazy, util::Ready, IntoServiceFactory, Service, ServiceFactory};
use crate::{IntoServiceFactory, Service, ServiceFactory};
#[derive(Clone)]
struct Srv;

View file

@ -304,10 +304,11 @@ where
#[cfg(test)]
#[allow(clippy::redundant_closure)]
mod tests {
use ntex_util::future::Ready;
use std::{cell::Cell, rc::Rc};
use super::*;
use crate::{fn_factory_with_config, fn_service, util::Ready, ServiceFactory};
use crate::{fn_factory_with_config, fn_service, ServiceFactory};
#[ntex::test]
async fn test_map_config() {

View file

@ -208,8 +208,8 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::util::{lazy, Ready};
use crate::{IntoServiceFactory, Service, ServiceFactory};
use ntex_util::future::{lazy, Ready};
#[derive(Clone)]
struct Srv;

View file

@ -256,11 +256,10 @@ where
#[cfg(test)]
mod tests {
use ntex_util::future::{lazy, Ready};
use std::{cell::Cell, rc::Rc, task::Context, task::Poll};
use crate::{
pipeline, pipeline_factory, util::lazy, util::Ready, Service, ServiceFactory,
};
use crate::{pipeline, pipeline_factory, Service, ServiceFactory};
#[derive(Clone)]
struct Srv1(Rc<Cell<usize>>);

View file

@ -233,8 +233,10 @@ where
#[cfg(test)]
#[allow(clippy::redundant_clone)]
mod tests {
use ntex_util::future::{lazy, Ready};
use super::*;
use crate::{fn_service, util::lazy, util::Ready, Service, ServiceFactory};
use crate::{fn_service, Service, ServiceFactory};
#[derive(Clone)]
struct Tr;

75
ntex-util/CHANGES.md Normal file
View file

@ -0,0 +1,75 @@
# Changes
## [0.4.1] - 2021-04-04
* Use Either from ntex-service
## [0.4.0] - 2021-02-23
* Migrate to tokio 1.x
## [0.3.0] - 2021-02-20
* Make Encoder and Decoder methods immutable
## [0.2.2] - 2021-01-21
* Flush underlying io stream
## [0.2.1] - 2020-08-10
* Require `Debug` impl for `Error`
## [0.2.0] - 2020-08-10
* Include custom `Encoder` and `Decoder` traits
* Remove `From<io::Error>` constraint from `Encoder` and `Decoder` traits
## [0.1.2] - 2020-04-17
* Do not swallow unprocessed data on read errors
## [0.1.1] - 2020-04-07
* Optimize io operations
* Fix framed close method
## [0.1.0] - 2020-03-31
* Fork crate to ntex namespace
* Use `.advance()` intead of `.split_to()`
* Add Unpin constraint and remove unneeded unsafe
## [0.2.0] - 2019-12-10
* Use specific futures dependencies
## [0.2.0-alpha.4]
* Fix buffer remaining capacity calcualtion
## [0.2.0-alpha.3]
* Use tokio 0.2
* Fix low/high watermark for write/read buffers
## [0.2.0-alpha.2]
* Migrated to `std::future`
## [0.1.2] - 2019-03-27
* Added `Framed::map_io()` method.
## [0.1.1] - 2019-03-06
* Added `FramedParts::with_read_buffer()` method.
## [0.1.0] - 2018-12-09
* Move codec to separate crate

26
ntex-util/Cargo.toml Normal file
View file

@ -0,0 +1,26 @@
[package]
name = "ntex-util"
version = "0.1.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for ntex framework"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://ntex.rs"
repository = "https://github.com/ntex-rs/ntex.git"
documentation = "https://docs.rs/ntex-util/"
categories = ["network-programming", "asynchronous"]
license = "MIT"
edition = "2018"
[lib]
name = "ntex_util"
path = "src/lib.rs"
[dependencies]
bitflags = "1.2"
slab = "0.4"
futures-core = { version = "0.3.13", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.13", default-features = false, features = ["alloc"] }
pin-project-lite = "0.2.6"
[dev-dependencies]
ntex = "0.3.14"

1
ntex-util/LICENSE Symbolic link
View file

@ -0,0 +1 @@
../LICENSE

View file

@ -0,0 +1,42 @@
//! Custom cell impl
use std::{cell::UnsafeCell, fmt, rc::Rc};
pub(super) struct Cell<T> {
inner: Rc<UnsafeCell<T>>,
}
impl<T> Clone for Cell<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: fmt::Debug> fmt::Debug for Cell<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}
impl<T> Cell<T> {
pub(super) fn new(inner: T) -> Self {
Self {
inner: Rc::new(UnsafeCell::new(inner)),
}
}
pub(super) fn strong_count(&self) -> usize {
Rc::strong_count(&self.inner)
}
pub(super) fn get_ref(&self) -> &T {
unsafe { &*self.inner.as_ref().get() }
}
#[allow(clippy::mut_from_ref)]
pub(super) fn get_mut(&self) -> &mut T {
unsafe { &mut *self.inner.as_ref().get() }
}
}

View file

@ -1,6 +1,5 @@
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use slab::Slab;
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use super::cell::Cell;
use crate::task::LocalWaker;
@ -69,12 +68,6 @@ impl Waiter {
}
Poll::Pending
}
#[doc(hidden)]
#[deprecated(since = "0.3.0")]
pub fn poll_waiter(&self, cx: &mut Context<'_>) -> Poll<()> {
self.poll_ready(cx)
}
}
impl Clone for Waiter {
@ -106,7 +99,7 @@ mod tests {
use super::*;
use crate::util::lazy;
#[crate::rt_test]
#[ntex::test]
#[allow(clippy::unit_cmp)]
async fn test_condition() {
let cond = Condition::new();
@ -134,8 +127,7 @@ mod tests {
assert_eq!(waiter2.await, ());
}
#[crate::rt_test]
#[allow(deprecated)]
#[ntex::test]
async fn test_condition_poll() {
let cond = Condition::new();
let waiter = cond.wait();
@ -143,13 +135,8 @@ mod tests {
cond.notify();
assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
let waiter = cond.wait();
assert_eq!(lazy(|cx| waiter.poll_waiter(cx)).await, Poll::Pending);
let waiter2 = waiter.clone();
assert_eq!(lazy(|cx| waiter2.poll_waiter(cx)).await, Poll::Pending);
drop(cond);
assert_eq!(lazy(|cx| waiter.poll_waiter(cx)).await, Poll::Ready(()));
assert_eq!(lazy(|cx| waiter2.poll_waiter(cx)).await, Poll::Ready(()));
assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
}
}

View file

@ -0,0 +1,19 @@
//! Communication primitives
mod cell;
pub mod condition;
pub mod oneshot;
pub mod pool;
/// Error returned from a [`Receiver`](Receiver) when the corresponding
/// [`Sender`](Sender) is dropped.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct Canceled;
impl std::fmt::Display for Canceled {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::write!(f, "oneshot canceled")
}
}
impl std::error::Error for Canceled {}

View file

@ -1,7 +1,6 @@
//! A one-shot pool, futures-aware channel.
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use slab::Slab;
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use super::{cell::Cell, Canceled};
use crate::task::LocalWaker;

View file

@ -0,0 +1,101 @@
use std::{future::Future, mem, pin::Pin, task::Context, task::Poll};
use crate::future::MaybeDone;
/// Future for the `join` combinator, waiting for two futures to complete.
pub async fn join<A, B>(fut_a: A, fut_b: B) -> (A::Output, B::Output)
where
A: Future,
B: Future,
{
Join {
fut_a: MaybeDone::Pending(fut_a),
fut_b: MaybeDone::Pending(fut_b),
}
.await
}
pin_project_lite::pin_project! {
pub(crate) struct Join<A: Future, B: Future> {
#[pin]
fut_a: MaybeDone<A>,
#[pin]
fut_b: MaybeDone<B>,
}
}
impl<A, B> Future for Join<A, B>
where
A: Future,
B: Future,
{
type Output = (A::Output, B::Output);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let mut all_done = true;
all_done &= this.fut_a.as_mut().poll(cx).is_ready();
all_done &= this.fut_b.as_mut().poll(cx).is_ready();
if all_done {
Poll::Ready((
this.fut_a.take_output().unwrap(),
this.fut_b.take_output().unwrap(),
))
} else {
Poll::Pending
}
}
}
/// Creates a future which represents a collection of the outputs of the futures
/// given.
pub async fn join_all<I>(i: I) -> Vec<<I::Item as Future>::Output>
where
I: IntoIterator,
I::Item: Future,
{
let elems: Box<[_]> = i.into_iter().map(MaybeDone::Pending).collect();
JoinAll {
elems: elems.into(),
}
.await
}
pub(crate) struct JoinAll<T: Future> {
elems: Pin<Box<[MaybeDone<T>]>>,
}
fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
// invariants aren't required to transmit through slices. Otherwise this has
// the same safety as a normal field pin projection.
unsafe { slice.get_unchecked_mut() }
.iter_mut()
.map(|t| unsafe { Pin::new_unchecked(t) })
}
impl<T> Future for JoinAll<T>
where
T: Future,
{
type Output = Vec<T::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut all_done = true;
for elem in iter_pin_mut(self.elems.as_mut()) {
if elem.poll(cx).is_pending() {
all_done = false;
}
}
if all_done {
let mut elems = mem::replace(&mut self.elems, Box::pin([]));
let result = iter_pin_mut(elems.as_mut())
.map(|e| e.take_output().unwrap())
.collect();
Poll::Ready(result)
} else {
Poll::Pending
}
}
}

107
ntex-util/src/future/mod.rs Normal file
View file

@ -0,0 +1,107 @@
//! Utilities for futures
use std::{future::Future, mem, pin::Pin, task::Context, task::Poll};
pub use futures_core::Stream;
pub use futures_sink::Sink;
mod either;
mod join;
mod lazy;
mod ready;
mod select;
pub use self::either::Either;
pub use self::join::{join, join_all};
pub use self::lazy::{lazy, Lazy};
pub use self::ready::Ready;
pub use self::select::select;
/// Creates a new future wrapping around a function returning [`Poll`].
///
/// Polling the returned future delegates to the wrapped function.
pub fn poll_fn<T, F>(f: F) -> impl Future<Output = T>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
PollFn { f }
}
/// Future for the [`poll_fn`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct PollFn<F> {
f: F,
}
impl<F> Unpin for PollFn<F> {}
impl<T, F> Future for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
(&mut self.f)(cx)
}
}
/// Creates a future that resolves to the next item in the stream.
pub async fn next<S>(stream: &mut S) -> Option<S::Item>
where
S: Stream + Unpin,
{
poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await
}
/// A future that completes after the given item has been fully processed
/// into the sink, including flushing.
pub async fn send<S, I>(sink: &mut S, item: I) -> Result<(), S::Error>
where
S: Sink<I> + Unpin,
{
poll_fn(|cx| Pin::new(&mut *sink).poll_ready(cx)).await?;
Pin::new(&mut *sink).start_send(item)?;
poll_fn(|cx| Pin::new(&mut *sink).poll_flush(cx)).await
}
enum MaybeDone<F>
where
F: Future,
{
Pending(F),
Done(F::Output),
Gone,
}
impl<F: Future> MaybeDone<F> {
fn take_output(self: Pin<&mut Self>) -> Option<F::Output> {
match &*self {
Self::Done(_) => {}
Self::Pending(_) | Self::Gone => return None,
}
unsafe {
match mem::replace(self.get_unchecked_mut(), Self::Gone) {
MaybeDone::Done(output) => Some(output),
_ => unreachable!(),
}
}
}
}
impl<F: Future> Future for MaybeDone<F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe {
match self.as_mut().get_unchecked_mut() {
MaybeDone::Pending(f) => {
let res = futures_core::ready!(Pin::new_unchecked(f).poll(cx));
self.set(Self::Done(res));
}
MaybeDone::Done(_) => {}
MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
}
}
Poll::Ready(())
}
}

View file

@ -0,0 +1,43 @@
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use crate::future::Either;
/// Waits for either one of two differently-typed futures to complete.
pub async fn select<A, B>(fut_a: A, fut_b: B) -> Either<A::Output, B::Output>
where
A: Future,
B: Future,
{
Select { fut_a, fut_b }.await
}
pin_project_lite::pin_project! {
pub(crate) struct Select<A, B> {
#[pin]
fut_a: A,
#[pin]
fut_b: B,
}
}
impl<A, B> Future for Select<A, B>
where
A: Future,
B: Future,
{
type Output = Either<A::Output, B::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(item) = this.fut_a.poll(cx) {
return Poll::Ready(Either::Left(item));
}
if let Poll::Ready(item) = this.fut_b.poll(cx) {
return Poll::Ready(Either::Right(item));
}
Poll::Pending
}
}

7
ntex-util/src/lib.rs Normal file
View file

@ -0,0 +1,7 @@
//! Utilities for ntex framework
pub mod channel;
pub mod future;
pub mod task;
pub use futures_core::{ready, Stream};
pub use futures_sink::Sink;

View file

@ -44,6 +44,7 @@ ntex-rt = "0.2.2"
ntex-router = "0.4.2"
ntex-service = "0.1.7"
ntex-macros = "0.1.3"
ntex-util = "0.1.0"
base64 = "0.13"
bitflags = "1.2"

View file

@ -1,20 +1,5 @@
//! Communication primitives
mod cell;
pub mod condition;
pub mod mpsc;
pub mod oneshot;
pub mod pool;
/// Error returned from a [`Receiver`](Receiver) when the corresponding
/// [`Sender`](Sender) is dropped.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct Canceled;
impl std::fmt::Display for Canceled {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::write!(f, "oneshot canceled")
}
}
impl std::error::Error for Canceled {}
pub use ntex_util::channel::*;

View file

@ -38,7 +38,6 @@ pub mod connect;
pub mod framed;
pub mod http;
pub mod server;
pub mod task;
pub mod testing;
pub mod util;
pub mod web;
@ -48,6 +47,7 @@ pub use self::service::*;
pub use futures_core::stream::Stream;
pub use futures_sink::Sink;
pub use ntex_util::task;
pub mod codec {
//! Utilities for encoding and decoding frames.

View file

@ -1,5 +1,3 @@
use std::{future::Future, pin::Pin, task::Context, task::Poll};
pub mod buffer;
pub mod counter;
mod extensions;
@ -13,162 +11,9 @@ pub mod variant;
pub use self::extensions::Extensions;
pub use ntex_service::util::{lazy, Either, Lazy, Ready};
pub use bytes::{Buf, BufMut, Bytes, BytesMut};
pub use bytestring::ByteString;
pub use ntex_util::future::*;
pub type HashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;
pub type HashSet<V> = std::collections::HashSet<V, ahash::RandomState>;
/// Creates a new future wrapping around a function returning [`Poll`].
///
/// Polling the returned future delegates to the wrapped function.
pub fn poll_fn<T, F>(f: F) -> impl Future<Output = T>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
PollFn { f }
}
/// Future for the [`poll_fn`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct PollFn<F> {
f: F,
}
impl<F> Unpin for PollFn<F> {}
impl<T, F> Future for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
(&mut self.f)(cx)
}
}
/// Creates a future that resolves to the next item in the stream.
pub async fn next<S>(stream: &mut S) -> Option<S::Item>
where
S: crate::Stream + Unpin,
{
poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await
}
/// A future that completes after the given item has been fully processed
/// into the sink, including flushing.
pub async fn send<S, I>(sink: &mut S, item: I) -> Result<(), S::Error>
where
S: crate::Sink<I> + Unpin,
{
poll_fn(|cx| Pin::new(&mut *sink).poll_ready(cx)).await?;
Pin::new(&mut *sink).start_send(item)?;
poll_fn(|cx| Pin::new(&mut *sink).poll_flush(cx)).await
}
/// Future for the `join` combinator, waiting for two futures to
/// complete.
pub async fn join<A, B>(fut_a: A, fut_b: B) -> (A::Output, B::Output)
where
A: Future,
B: Future,
{
tokio::pin!(fut_a);
tokio::pin!(fut_b);
let mut res_a = None;
let mut res_b = None;
poll_fn(|cx| {
if res_a.is_none() {
if let Poll::Ready(item) = Pin::new(&mut fut_a).poll(cx) {
res_a = Some(item)
}
}
if res_b.is_none() {
if let Poll::Ready(item) = Pin::new(&mut fut_b).poll(cx) {
res_b = Some(item)
}
}
if res_a.is_some() && res_b.is_some() {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await;
(res_a.unwrap(), res_b.unwrap())
}
/// Waits for either one of two differently-typed futures to complete.
pub async fn select<A, B>(fut_a: A, fut_b: B) -> Either<A::Output, B::Output>
where
A: Future,
B: Future,
{
tokio::pin!(fut_a);
tokio::pin!(fut_b);
poll_fn(|cx| {
if let Poll::Ready(item) = Pin::new(&mut fut_a).poll(cx) {
Poll::Ready(Either::Left(item))
} else if let Poll::Ready(item) = Pin::new(&mut fut_b).poll(cx) {
Poll::Ready(Either::Right(item))
} else {
Poll::Pending
}
})
.await
}
enum MaybeDone<T>
where
T: Future,
{
Pending(T),
Done(T::Output),
}
/// Creates a future which represents a collection of the outputs of the futures given.
pub async fn join_all<F, T>(fut: Vec<F>) -> Vec<T>
where
F: Future<Output = T>,
{
let mut futs: Vec<_> = fut
.into_iter()
.map(|f| MaybeDone::Pending(Box::pin(f)))
.collect();
poll_fn(|cx| {
let mut pending = false;
for item in &mut futs {
if let MaybeDone::Pending(ref mut fut) = item {
if let Poll::Ready(res) = fut.as_mut().poll(cx) {
*item = MaybeDone::Done(res);
} else {
pending = true;
}
}
}
if pending {
Poll::Pending
} else {
Poll::Ready(())
}
})
.await;
futs.into_iter()
.map(|item| {
if let MaybeDone::Done(item) = item {
item
} else {
unreachable!()
}
})
.collect()
}