diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 52c94e41..c4dc4113 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -75,11 +75,12 @@ serde_urlencoded = "0.6.1" url = "2.1" time = { version = "0.2.9", default-features = false, features = ["std"] } coo-kie = { version = "0.13.3", package = "cookie", optional = true } -tokio = "0.2.6" +tokio = "=0.2.6" # resolver -trust-dns-proto = { version = "0.19.3", features = ["tokio-runtime"] } -trust-dns-resolver = { version = "0.19.3", features = ["tokio-runtime"] } +trust-dns-proto = { version = "0.19.3", default-features = false } +trust-dns-resolver = { version = "0.19.3", default-features = false, features=["system-config"] } +async-trait = "0.1.27" # this is only for trust-dns # FIXME: Remove it and use mio own uds feature once mio 0.7 is released mio-uds = { version = "0.6.7" } diff --git a/ntex/src/connect/resolve.rs b/ntex/src/connect/resolve.rs index 7bb39c78..4d5a3518 100644 --- a/ntex/src/connect/resolve.rs +++ b/ntex/src/connect/resolve.rs @@ -1,5 +1,6 @@ use std::cell::RefCell; use std::future::Future; +use std::io; use std::marker::PhantomData; use std::net::SocketAddr; use std::pin::Pin; @@ -8,12 +9,18 @@ use std::task::{Context, Poll}; use futures::future::{ok, Either, FutureExt, LocalBoxFuture, Ready}; use futures::ready; + +use trust_dns_proto::{error::ProtoError, Time}; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; use trust_dns_resolver::error::ResolveError; use trust_dns_resolver::lookup_ip::LookupIp; -use trust_dns_resolver::TokioAsyncResolver; +use trust_dns_resolver::name_server::{ + GenericConnection, GenericConnectionProvider, RuntimeProvider, Spawn, +}; +use trust_dns_resolver::AsyncResolver as TAsyncResolver; use crate::channel::condition::{Condition, Waiter}; +use crate::rt::net::{self, TcpStream}; use crate::service::{Service, ServiceFactory}; use super::connect::{Address, Connect}; @@ -199,6 +206,9 @@ impl AsyncResolver { } } +type TokioAsyncResolver = + TAsyncResolver>; + enum AsyncResolverState { New(ResolverConfig, ResolverOpts), NewFromSystem, @@ -242,9 +252,10 @@ impl Future for LookupIpFuture { match &mut *state { AsyncResolverState::New(config, options) => { this.fut = LookupIpState::Create( - TokioAsyncResolver::tokio( + TAsyncResolver::new( config.clone(), options.clone(), + Handle, ) .boxed_local(), ); @@ -252,7 +263,7 @@ impl Future for LookupIpFuture { } AsyncResolverState::NewFromSystem => { this.fut = LookupIpState::Create( - TokioAsyncResolver::tokio_from_system_conf() + TokioAsyncResolver::from_system_conf(Handle) .boxed_local(), ); *state = AsyncResolverState::Creating(Condition::default()); @@ -262,7 +273,7 @@ impl Future for LookupIpFuture { } AsyncResolverState::Resolver(ref resolver) => { let host = this.host.clone(); - let resolver = resolver.clone(); + let resolver: TokioAsyncResolver = Clone::clone(resolver); this.fut = LookupIpState::Lookup( async move { resolver.lookup_ip(host.as_str()).await } @@ -275,3 +286,110 @@ impl Future for LookupIpFuture { } } } + +#[derive(Clone, Copy)] +struct Handle; + +impl Spawn for Handle { + fn spawn_bg(&mut self, future: F) + where + F: Future> + Send + 'static, + { + crate::rt::Arbiter::spawn(future.map(|_| ())); + } +} + +struct UdpSocket(net::UdpSocket); + +#[derive(Clone)] +struct TokioRuntime; +impl RuntimeProvider for TokioRuntime { + type Handle = Handle; + type Tcp = AsyncIo02As03; + type Timer = TokioTime; + type Udp = UdpSocket; +} + +/// Conversion from `tokio::io::{AsyncRead, AsyncWrite}` to `std::io::{AsyncRead, AsyncWrite}` +struct AsyncIo02As03(T); + +use crate::codec::{AsyncRead as AsyncRead02, AsyncWrite as AsyncWrite02}; +use futures::io::{AsyncRead, AsyncWrite}; + +impl Unpin for AsyncIo02As03 {} +impl AsyncRead for AsyncIo02As03 { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } +} + +impl AsyncWrite for AsyncIo02As03 { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_shutdown(cx) + } +} + +#[async_trait::async_trait] +impl trust_dns_proto::tcp::Connect for AsyncIo02As03 { + type Transport = AsyncIo02As03; + + async fn connect(addr: SocketAddr) -> io::Result { + TcpStream::connect(&addr).await.map(AsyncIo02As03) + } +} + +#[async_trait::async_trait] +impl trust_dns_proto::udp::UdpSocket for UdpSocket { + async fn bind(addr: &SocketAddr) -> io::Result { + net::UdpSocket::bind(addr).await.map(|sock| UdpSocket(sock)) + } + + async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.0.recv_from(buf).await + } + + async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result { + self.0.send_to(buf, target).await + } +} + +/// New type which is implemented using tokio::time::{Delay, Timeout} +struct TokioTime; + +#[async_trait::async_trait] +impl Time for TokioTime { + async fn delay_for(duration: std::time::Duration) { + tokio::time::delay_for(duration).await + } + + async fn timeout( + duration: std::time::Duration, + future: F, + ) -> Result { + tokio::time::timeout(duration, future) + .await + .map_err(move |_| { + std::io::Error::new(std::io::ErrorKind::TimedOut, "future timed out") + }) + } +}