From 250b9768c90b6e24b5ef276f5441dc01fc6dd26e Mon Sep 17 00:00:00 2001 From: Will Brown Date: Sun, 9 Jan 2022 22:12:31 -0500 Subject: [PATCH] add socket options (#92) --- ntex-tokio/src/io.rs | 36 +++++++++++++++++++++++++++++++++--- ntex-tokio/src/lib.rs | 2 +- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/ntex-tokio/src/io.rs b/ntex-tokio/src/io.rs index 99856895..ca726017 100644 --- a/ntex-tokio/src/io.rs +++ b/ntex-tokio/src/io.rs @@ -1,12 +1,12 @@ use std::task::{Context, Poll}; -use std::{any, cell::RefCell, cmp, future::Future, io, mem, pin::Pin, rc::Rc}; +use std::{any, cell::RefCell, cmp, future::Future, io, mem, pin::Pin, rc::Rc, rc::Weak}; use ntex_bytes::{Buf, BufMut, BytesMut}; use ntex_io::{ types, Filter, Handle, Io, IoBoxed, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, }; -use ntex_util::{ready, time::sleep, time::Sleep}; +use ntex_util::{ready, time::sleep, time::Millis, time::Sleep}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; @@ -28,6 +28,8 @@ impl Handle for HandleWrapper { if let Ok(addr) = self.0.borrow().peer_addr() { return Some(Box::new(types::PeerAddr(addr))); } + } else if id == any::TypeId::of::() { + return Some(Box::new(SocketOptions(Rc::downgrade(&self.0)))); } None } @@ -199,7 +201,15 @@ impl Future for WriteTask { Poll::Ready(WriteStatus::Terminate) => { log::trace!("write task is instructed to terminate"); - let _ = Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx); + if !matches!( + this.io.borrow().linger(), + Ok(Some(std::time::Duration::ZERO)) + ) { + // call shutdown to prevent flushing data on terminated Io. when + // linger is set to zero, closing will reset the connection, so + // shutdown is not neccessary. + let _ = Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx); + } this.state.close(None); Poll::Ready(()) } @@ -433,6 +443,26 @@ impl AsyncWrite for TokioIoBoxed { } } +/// Query TCP Io connections for a handle to set socket options +pub struct SocketOptions(Weak>); + +impl SocketOptions { + pub fn set_linger(&self, dur: Option) -> io::Result<()> { + self.try_self() + .and_then(|s| s.borrow().set_linger(dur.map(|d| d.into()))) + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.try_self().and_then(|s| s.borrow().set_ttl(ttl)) + } + + fn try_self(&self) -> io::Result>> { + self.0 + .upgrade() + .ok_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "socket is gone")) + } +} + #[cfg(unix)] mod unixstream { use tokio::net::UnixStream; diff --git a/ntex-tokio/src/lib.rs b/ntex-tokio/src/lib.rs index a3bd3b50..b157829d 100644 --- a/ntex-tokio/src/lib.rs +++ b/ntex-tokio/src/lib.rs @@ -6,7 +6,7 @@ use ntex_io::Io; mod io; mod signals; -pub use self::io::TokioIoBoxed; +pub use self::io::{SocketOptions, TokioIoBoxed}; pub use self::signals::{signal, Signal}; struct TcpStream(tokio::net::TcpStream);