add socket options (#92)

This commit is contained in:
Will Brown 2022-01-09 22:12:31 -05:00 committed by GitHub
parent eac1f068fb
commit 250b9768c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 4 deletions

View file

@ -1,12 +1,12 @@
use std::task::{Context, Poll};
use std::{any, cell::RefCell, cmp, future::Future, io, mem, pin::Pin, rc::Rc};
use std::{any, cell::RefCell, cmp, future::Future, io, mem, pin::Pin, rc::Rc, rc::Weak};
use ntex_bytes::{Buf, BufMut, BytesMut};
use ntex_io::{
types, Filter, Handle, Io, IoBoxed, IoStream, ReadContext, ReadStatus, WriteContext,
WriteStatus,
};
use ntex_util::{ready, time::sleep, time::Sleep};
use ntex_util::{ready, time::sleep, time::Millis, time::Sleep};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
@ -28,6 +28,8 @@ impl Handle for HandleWrapper {
if let Ok(addr) = self.0.borrow().peer_addr() {
return Some(Box::new(types::PeerAddr(addr)));
}
} else if id == any::TypeId::of::<SocketOptions>() {
return Some(Box::new(SocketOptions(Rc::downgrade(&self.0))));
}
None
}
@ -199,7 +201,15 @@ impl Future for WriteTask {
Poll::Ready(WriteStatus::Terminate) => {
log::trace!("write task is instructed to terminate");
let _ = Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx);
if !matches!(
this.io.borrow().linger(),
Ok(Some(std::time::Duration::ZERO))
) {
// call shutdown to prevent flushing data on terminated Io. when
// linger is set to zero, closing will reset the connection, so
// shutdown is not neccessary.
let _ = Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx);
}
this.state.close(None);
Poll::Ready(())
}
@ -433,6 +443,26 @@ impl AsyncWrite for TokioIoBoxed {
}
}
/// Query TCP Io connections for a handle to set socket options
pub struct SocketOptions(Weak<RefCell<TcpStream>>);
impl SocketOptions {
pub fn set_linger(&self, dur: Option<Millis>) -> io::Result<()> {
self.try_self()
.and_then(|s| s.borrow().set_linger(dur.map(|d| d.into())))
}
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.try_self().and_then(|s| s.borrow().set_ttl(ttl))
}
fn try_self(&self) -> io::Result<Rc<RefCell<TcpStream>>> {
self.0
.upgrade()
.ok_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "socket is gone"))
}
}
#[cfg(unix)]
mod unixstream {
use tokio::net::UnixStream;

View file

@ -6,7 +6,7 @@ use ntex_io::Io;
mod io;
mod signals;
pub use self::io::TokioIoBoxed;
pub use self::io::{SocketOptions, TokioIoBoxed};
pub use self::signals::{signal, Signal};
struct TcpStream(tokio::net::TcpStream);