Rewrite without async/await

This commit is contained in:
Frank Denis 2018-02-10 16:39:07 +01:00
parent ce48cec32c
commit f73fc5539b
2 changed files with 50 additions and 43 deletions

View file

@ -12,7 +12,7 @@ categories = ["asynchronous", "network-programming","command-line-utilities"]
[dependencies]
base64 = "~0.9"
clap = "~2"
futures-await = "~0.1"
futures = "~0.1"
hyper = "~0.11"
tokio = "~0.1"
tokio-io = "~0.1"

View file

@ -5,7 +5,7 @@
extern crate base64;
extern crate clap;
extern crate futures_await as futures;
extern crate futures;
extern crate hyper;
extern crate tokio;
extern crate tokio_io;
@ -128,52 +128,59 @@ impl DoH {
Box::new(future::ok(response))
}
#[async]
fn proxy(query: Vec<u8>) -> Result<Response, ()> {
fn proxy(query: Vec<u8>) -> impl Future<Item = Response, Error = ()> {
let local_addr = LOCAL_BIND_ADDRESS.parse().unwrap();
let socket = UdpSocket::bind(&local_addr).unwrap();
let remote_addr = SERVER_ADDRESS.parse().unwrap();
let (socket, _) = await!(socket.send_dgram(query, &remote_addr)).map_err(|_| ())?;
let mut packet = vec![0; MAX_DNS_RESPONSE_LEN];
let (_socket, mut packet, len, server_addr) =
await!(socket.recv_dgram(packet)).map_err(|_| ())?;
if len < MIN_DNS_PACKET_LEN || server_addr != remote_addr {
return Err(());
}
packet.truncate(len);
let min_ttl = dns::min_ttl(&packet, MIN_TTL, MAX_TTL, ERR_TTL).map_err(|_| {})?;
Ok((packet, min_ttl)).map(|(body, ttl)| {
let body_len = body.len();
let mut response = Response::new();
response.set_body(body);
response
.with_header(ContentLength(body_len as u64))
.with_header(ContentType(
"application/dns-udpwireformat".parse().unwrap(),
))
.with_header(CacheControl(vec![CacheDirective::MaxAge(ttl)]))
})
let remote_addr: SocketAddr = SERVER_ADDRESS.parse().unwrap();
let fut = socket
.send_dgram(query, &remote_addr)
.map_err(|_| ())
.and_then(move |(socket, _)| {
let packet = vec![0; MAX_DNS_RESPONSE_LEN];
socket.recv_dgram(packet).map_err(|_| {})
})
.and_then(move |(_socket, mut packet, len, server_addr)| {
if len < MIN_DNS_PACKET_LEN || server_addr != remote_addr {
return future::err(());
}
packet.truncate(len);
let ttl = match dns::min_ttl(&packet, MIN_TTL, MAX_TTL, ERR_TTL) {
Err(_) => return future::err(()),
Ok(min_ttl) => min_ttl,
};
let packet_len = packet.len();
let mut response = Response::new();
response.set_body(packet);
let response = response
.with_header(ContentLength(packet_len as u64))
.with_header(ContentType(
"application/dns-udpwireformat".parse().unwrap(),
))
.with_header(CacheControl(vec![CacheDirective::MaxAge(ttl)]));
future::ok(response)
});
fut
}
#[async]
fn read_body_and_proxy(&self, body: Body) -> Result<Response, ()> {
let query = await!({
let mut sum_size = 0;
body.and_then(move |chunk| {
sum_size += chunk.len();
if sum_size > MAX_DNS_QUESTION_LEN {
Err(hyper::error::Error::TooLarge)
} else {
Ok(chunk)
fn read_body_and_proxy(&self, body: Body) -> impl Future<Item = Response, Error = ()> {
let mut sum_size = 0;
let fut = body.and_then(move |chunk| {
sum_size += chunk.len();
if sum_size > MAX_DNS_QUESTION_LEN {
Err(hyper::error::Error::TooLarge)
} else {
Ok(chunk)
}
}).concat2()
.map_err(move |_err| ())
.map(move |chunk| chunk.to_vec())
.and_then(move |query| {
if query.len() < MIN_DNS_PACKET_LEN {
return Box::new(future::err(())) as Box<Future<Item = _, Error = _>>;
}
}).concat2()
.map_err(|_err| ())
.map(|chunk| chunk.to_vec())
})?;
if query.len() < MIN_DNS_PACKET_LEN {
return Err(());
}
await!(Self::proxy(query))
Box::new(Self::proxy(query))
});
fut
}
}