diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 2921d3ae..b0aae8bd 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -6,6 +6,8 @@ * Do not set `reuse_address` for tcp listener on window os +* Set nodelay to accept/connect sockets + * Update ntex-router v0.4.1 * Update cookie v0.15.0 diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 75a58e51..9a47184e 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -50,8 +50,8 @@ derive_more = "0.99.11" either = "1.6.1" encoding_rs = "0.8.26" futures = "0.3.13" -ahash = "0.7.1" -h2 = "0.3" +ahash = "0.7.2" +h2 = "0.3.1" http = "0.2.1" httparse = "1.3" log = "0.4" diff --git a/ntex/src/connect/service.rs b/ntex/src/connect/service.rs index 02b4c3c6..c4a1b3f4 100644 --- a/ntex/src/connect/service.rs +++ b/ntex/src/connect/service.rs @@ -170,6 +170,16 @@ impl TcpConnectorResponse { }, } } + + fn can_continue(&self, err: &io::Error) -> bool { + trace!( + "TCP connector - failed to connect to connecting to {:?} port: {} err: {:?}", + self.req.as_ref().unwrap().host(), + self.port, + err + ); + !(self.addrs.is_none() || self.addrs.as_ref().unwrap().is_empty()) + } } impl Future for TcpConnectorResponse { @@ -183,6 +193,12 @@ impl Future for TcpConnectorResponse { if let Some(new) = this.stream.as_mut() { match new.as_mut().poll(cx) { Poll::Ready(Ok(sock)) => { + if let Err(err) = sock.set_nodelay(true) { + if !this.can_continue(&err) { + return Poll::Ready(Err(err.into())); + } + } + let req = this.req.take().unwrap(); trace!( "TCP connector - successfully connected to connecting to {:?} - {:?}", @@ -192,14 +208,7 @@ impl Future for TcpConnectorResponse { } Poll::Pending => return Poll::Pending, Poll::Ready(Err(err)) => { - trace!( - "TCP connector - failed to connect to connecting to {:?} port: {}", - this.req.as_ref().unwrap().host(), - this.port, - ); - if this.addrs.is_none() - || this.addrs.as_ref().unwrap().is_empty() - { + if !this.can_continue(&err) { return Poll::Ready(Err(err.into())); } } diff --git a/ntex/src/server/service.rs b/ntex/src/server/service.rs index ab9ef4e0..0be53264 100644 --- a/ntex/src/server/service.rs +++ b/ntex/src/server/service.rs @@ -85,7 +85,7 @@ where match req { ServerMessage::Connect(stream) => { let stream = FromStream::from_stream(stream).map_err(|e| { - error!("Can not convert to an async tcp stream: {}", e); + error!("Can not convert to an async io stream: {}", e); }); if let Ok(stream) = stream { diff --git a/ntex/src/server/socket.rs b/ntex/src/server/socket.rs index 6d21766c..e92ee342 100644 --- a/ntex/src/server/socket.rs +++ b/ntex/src/server/socket.rs @@ -157,7 +157,9 @@ impl FromStream for TcpStream { Stream::Tcp(stream) => { use std::os::unix::io::{FromRawFd, IntoRawFd}; let fd = IntoRawFd::into_raw_fd(stream); - TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(fd) }) + let io = TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(fd) })?; + io.set_nodelay(true)?; + Ok(io) } #[cfg(unix)] Stream::Uds(_) => { @@ -174,7 +176,10 @@ impl FromStream for TcpStream { Stream::Tcp(stream) => { use std::os::windows::io::{FromRawSocket, IntoRawSocket}; let fd = IntoRawSocket::into_raw_socket(stream); - TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(fd) }) + let io = + TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(fd) })?; + io.set_nodelay(true)?; + Ok(io) } #[cfg(unix)] Stream::Uds(_) => { @@ -190,11 +195,10 @@ impl FromStream for crate::rt::net::UnixStream { match sock { Stream::Tcp(_) => panic!("Should not happen, bug in server impl"), Stream::Uds(stream) => { + use crate::rt::net::UnixStream; use std::os::unix::io::{FromRawFd, IntoRawFd}; let fd = IntoRawFd::into_raw_fd(stream); - crate::rt::net::UnixStream::from_std(unsafe { - FromRawFd::from_raw_fd(fd) - }) + UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(fd) }) } } }