pin tokio to 0.2.6

This commit is contained in:
Nikolay Kim 2020-04-01 12:10:59 +06:00
parent 8bfabe6844
commit d2edbdf9f1
2 changed files with 126 additions and 7 deletions

View file

@ -75,11 +75,12 @@ serde_urlencoded = "0.6.1"
url = "2.1"
time = { version = "0.2.9", default-features = false, features = ["std"] }
coo-kie = { version = "0.13.3", package = "cookie", optional = true }
tokio = "0.2.6"
tokio = "=0.2.6"
# resolver
trust-dns-proto = { version = "0.19.3", features = ["tokio-runtime"] }
trust-dns-resolver = { version = "0.19.3", features = ["tokio-runtime"] }
trust-dns-proto = { version = "0.19.3", default-features = false }
trust-dns-resolver = { version = "0.19.3", default-features = false, features=["system-config"] }
async-trait = "0.1.27" # this is only for trust-dns
# FIXME: Remove it and use mio own uds feature once mio 0.7 is released
mio-uds = { version = "0.6.7" }

View file

@ -1,5 +1,6 @@
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::pin::Pin;
@ -8,12 +9,18 @@ use std::task::{Context, Poll};
use futures::future::{ok, Either, FutureExt, LocalBoxFuture, Ready};
use futures::ready;
use trust_dns_proto::{error::ProtoError, Time};
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::error::ResolveError;
use trust_dns_resolver::lookup_ip::LookupIp;
use trust_dns_resolver::TokioAsyncResolver;
use trust_dns_resolver::name_server::{
GenericConnection, GenericConnectionProvider, RuntimeProvider, Spawn,
};
use trust_dns_resolver::AsyncResolver as TAsyncResolver;
use crate::channel::condition::{Condition, Waiter};
use crate::rt::net::{self, TcpStream};
use crate::service::{Service, ServiceFactory};
use super::connect::{Address, Connect};
@ -199,6 +206,9 @@ impl AsyncResolver {
}
}
type TokioAsyncResolver =
TAsyncResolver<GenericConnection, GenericConnectionProvider<TokioRuntime>>;
enum AsyncResolverState {
New(ResolverConfig, ResolverOpts),
NewFromSystem,
@ -242,9 +252,10 @@ impl Future for LookupIpFuture {
match &mut *state {
AsyncResolverState::New(config, options) => {
this.fut = LookupIpState::Create(
TokioAsyncResolver::tokio(
TAsyncResolver::new(
config.clone(),
options.clone(),
Handle,
)
.boxed_local(),
);
@ -252,7 +263,7 @@ impl Future for LookupIpFuture {
}
AsyncResolverState::NewFromSystem => {
this.fut = LookupIpState::Create(
TokioAsyncResolver::tokio_from_system_conf()
TokioAsyncResolver::from_system_conf(Handle)
.boxed_local(),
);
*state = AsyncResolverState::Creating(Condition::default());
@ -262,7 +273,7 @@ impl Future for LookupIpFuture {
}
AsyncResolverState::Resolver(ref resolver) => {
let host = this.host.clone();
let resolver = resolver.clone();
let resolver: TokioAsyncResolver = Clone::clone(resolver);
this.fut = LookupIpState::Lookup(
async move { resolver.lookup_ip(host.as_str()).await }
@ -275,3 +286,110 @@ impl Future for LookupIpFuture {
}
}
}
#[derive(Clone, Copy)]
struct Handle;
impl Spawn for Handle {
fn spawn_bg<F>(&mut self, future: F)
where
F: Future<Output = Result<(), ProtoError>> + Send + 'static,
{
crate::rt::Arbiter::spawn(future.map(|_| ()));
}
}
struct UdpSocket(net::UdpSocket);
#[derive(Clone)]
struct TokioRuntime;
impl RuntimeProvider for TokioRuntime {
type Handle = Handle;
type Tcp = AsyncIo02As03<TcpStream>;
type Timer = TokioTime;
type Udp = UdpSocket;
}
/// Conversion from `tokio::io::{AsyncRead, AsyncWrite}` to `std::io::{AsyncRead, AsyncWrite}`
struct AsyncIo02As03<T>(T);
use crate::codec::{AsyncRead as AsyncRead02, AsyncWrite as AsyncWrite02};
use futures::io::{AsyncRead, AsyncWrite};
impl<T> Unpin for AsyncIo02As03<T> {}
impl<R: AsyncRead02 + Unpin> AsyncRead for AsyncIo02As03<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
impl<W: AsyncWrite02 + Unpin> AsyncWrite for AsyncIo02As03<W> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_shutdown(cx)
}
}
#[async_trait::async_trait]
impl trust_dns_proto::tcp::Connect for AsyncIo02As03<TcpStream> {
type Transport = AsyncIo02As03<TcpStream>;
async fn connect(addr: SocketAddr) -> io::Result<Self::Transport> {
TcpStream::connect(&addr).await.map(AsyncIo02As03)
}
}
#[async_trait::async_trait]
impl trust_dns_proto::udp::UdpSocket for UdpSocket {
async fn bind(addr: &SocketAddr) -> io::Result<Self> {
net::UdpSocket::bind(addr).await.map(|sock| UdpSocket(sock))
}
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.0.recv_from(buf).await
}
async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
self.0.send_to(buf, target).await
}
}
/// New type which is implemented using tokio::time::{Delay, Timeout}
struct TokioTime;
#[async_trait::async_trait]
impl Time for TokioTime {
async fn delay_for(duration: std::time::Duration) {
tokio::time::delay_for(duration).await
}
async fn timeout<F: 'static + Future + Send>(
duration: std::time::Duration,
future: F,
) -> Result<F::Output, std::io::Error> {
tokio::time::timeout(duration, future)
.await
.map_err(move |_| {
std::io::Error::new(std::io::ErrorKind::TimedOut, "future timed out")
})
}
}