diff --git a/ntex-io/src/filter.rs b/ntex-io/src/filter.rs index ee2cf5c8..2792d210 100644 --- a/ntex-io/src/filter.rs +++ b/ntex-io/src/filter.rs @@ -1,4 +1,4 @@ -use std::{io, rc::Rc, task::Context, task::Poll}; +use std::{any, io, rc::Rc, task::Context, task::Poll}; use ntex_bytes::BytesMut; @@ -26,6 +26,16 @@ impl Filter for DefaultFilter { Poll::Ready(Ok(())) } + + fn query(&self, id: any::TypeId) -> Option> { + if let Some(hnd) = self.0.handle.take() { + let res = hnd.query(id); + self.0.handle.set(Some(hnd)); + res + } else { + None + } + } } impl ReadFilter for DefaultFilter { @@ -140,6 +150,10 @@ impl Filter for NullFilter { fn shutdown(&self, _: &IoRef) -> Poll> { Poll::Ready(Ok(())) } + + fn query(&self, _: any::TypeId) -> Option> { + None + } } impl ReadFilter for NullFilter { diff --git a/ntex-io/src/lib.rs b/ntex-io/src/lib.rs index b9e01a61..b05a4957 100644 --- a/ntex-io/src/lib.rs +++ b/ntex-io/src/lib.rs @@ -7,6 +7,7 @@ mod filter; mod state; mod tasks; mod time; +pub mod types; mod utils; #[cfg(feature = "tokio")] @@ -14,7 +15,7 @@ mod tokio_impl; use ntex_bytes::BytesMut; use ntex_codec::{Decoder, Encoder}; -use ntex_util::{channel::oneshot::Receiver, future::Either, time::Millis}; +use ntex_util::time::Millis; pub use self::dispatcher::Dispatcher; pub use self::filter::DefaultFilter; @@ -58,12 +59,7 @@ pub trait WriteFilter { pub trait Filter: ReadFilter + WriteFilter + 'static { fn shutdown(&self, st: &IoRef) -> Poll>; - fn query( - &self, - id: TypeId, - ) -> Either>, Receiver>>> { - Either::Left(None) - } + fn query(&self, id: TypeId) -> Option>; } pub trait FilterFactory: Sized { @@ -76,7 +72,11 @@ pub trait FilterFactory: Sized { } pub trait IoStream { - fn start(self, _: ReadContext, _: WriteContext); + fn start(self, _: ReadContext, _: WriteContext) -> Box; +} + +pub trait Handle { + fn query(&self, id: TypeId) -> Option>; } /// Framed transport item diff --git a/ntex-io/src/state.rs b/ntex-io/src/state.rs index be0e97b7..cb64d44e 100644 --- a/ntex-io/src/state.rs +++ b/ntex-io/src/state.rs @@ -1,6 +1,6 @@ use std::cell::{Cell, RefCell}; use std::task::{Context, Poll}; -use std::{fmt, future::Future, hash, io, mem, ops::Deref, pin::Pin, ptr, rc::Rc}; +use std::{any, fmt, future::Future, hash, io, mem, ops::Deref, pin::Pin, ptr, rc::Rc}; use ntex_bytes::{BytesMut, PoolId, PoolRef}; use ntex_codec::{Decoder, Encoder}; @@ -9,7 +9,7 @@ use ntex_util::{future::poll_fn, future::Either, task::LocalWaker}; use super::filter::{DefaultFilter, NullFilter}; use super::tasks::{ReadContext, WriteContext}; -use super::{Filter, FilterFactory, IoStream}; +use super::{types, Filter, FilterFactory, Handle, IoStream}; bitflags::bitflags! { pub struct Flags: u16 { @@ -66,6 +66,7 @@ pub(crate) struct IoStateInner { pub(super) read_buf: Cell>, pub(super) write_buf: Cell>, pub(super) filter: Cell<&'static dyn Filter>, + pub(super) handle: Cell>>, on_disconnect: RefCell>>, } @@ -198,6 +199,7 @@ impl Io { read_buf: Cell::new(None), write_buf: Cell::new(None), filter: Cell::new(NullFilter::get()), + handle: Cell::new(None), on_disconnect: RefCell::new(Vec::new()), }); @@ -211,7 +213,8 @@ impl Io { let io_ref = IoRef(inner); // start io tasks - io.start(ReadContext(io_ref.clone()), WriteContext(io_ref.clone())); + let hnd = io.start(ReadContext(io_ref.clone()), WriteContext(io_ref.clone())); + io_ref.0.handle.set(Some(hnd)); Io(io_ref, FilterItem::Ptr(Box::into_raw(filter))) } @@ -363,8 +366,12 @@ impl IoRef { #[inline] /// Query specific data - pub fn query(&self) -> Option { - todo!() + pub fn query(&self) -> types::QueryItem { + if let Some(item) = self.0.filter.get().query(any::TypeId::of::()) { + types::QueryItem::new(item) + } else { + types::QueryItem::empty() + } } } @@ -853,18 +860,17 @@ impl<'a> ReadRef<'a> { U: Decoder, { let mut buf = self.0.read_buf.take(); - let result = if let Some(ref mut buf) = buf { - let result = codec.decode(buf); + if let Some(ref mut b) = buf { + let result = codec.decode(b); if result.as_ref().map(|v| v.is_none()).unwrap_or(false) { self.0.remove_flags(Flags::RD_READY); } + self.0.read_buf.set(buf); result } else { self.0.remove_flags(Flags::RD_READY); Ok(None) - }; - self.0.read_buf.set(buf); - result + } } #[inline] diff --git a/ntex-io/src/testing.rs b/ntex-io/src/testing.rs index 1277d0d3..24123f01 100644 --- a/ntex-io/src/testing.rs +++ b/ntex-io/src/testing.rs @@ -1,13 +1,13 @@ use std::cell::{Cell, RefCell}; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; -use std::{cmp, fmt, future::Future, io, mem, pin::Pin, rc::Rc}; +use std::{any, cmp, fmt, future::Future, io, mem, net, pin::Pin, rc::Rc}; use ntex_bytes::{Buf, BufMut, BytesMut}; use ntex_util::future::poll_fn; use ntex_util::time::{sleep, Millis, Sleep}; -use crate::{IoStream, ReadContext, WriteContext, WriteReadiness}; +use crate::{types, Handle, IoStream, ReadContext, WriteContext, WriteReadiness}; #[derive(Default)] struct AtomicWaker(Arc>>>); @@ -30,6 +30,7 @@ impl fmt::Debug for AtomicWaker { #[derive(Debug)] pub struct IoTest { tp: Type, + peer_addr: Option, state: Arc>, local: Arc>>, remote: Arc>>, @@ -102,12 +103,14 @@ impl IoTest { ( IoTest { tp: Type::Client, + peer_addr: None, local: local.clone(), remote: remote.clone(), state: state.clone(), }, IoTest { state, + peer_addr: None, tp: Type::Server, local: remote, remote: local, @@ -128,6 +131,12 @@ impl IoTest { self.remote.lock().unwrap().borrow().is_closed() } + /// Set peer addr + pub fn set_peer_addr(mut self, addr: net::SocketAddr) -> Self { + self.peer_addr = Some(addr); + self + } + /// Set read to Pending state pub fn read_pending(&self) { self.remote.lock().unwrap().borrow_mut().read = IoState::Pending; @@ -317,6 +326,7 @@ impl Clone for IoTest { local: self.local.clone(), remote: self.remote.clone(), state: self.state.clone(), + peer_addr: self.peer_addr, } } } @@ -444,7 +454,7 @@ mod tokio { } impl IoStream for IoTest { - fn start(self, read: ReadContext, write: WriteContext) { + fn start(self, read: ReadContext, write: WriteContext) -> Box { let io = Rc::new(self); ntex_util::spawn(ReadTask { @@ -452,10 +462,23 @@ impl IoStream for IoTest { state: read, }); ntex_util::spawn(WriteTask { - io, + io: io.clone(), state: write, st: IoWriteState::Processing(None), }); + + Box::new(io) + } +} + +impl Handle for Rc { + fn query(&self, id: any::TypeId) -> Option> { + if id == any::TypeId::of::() { + if let Some(addr) = self.peer_addr { + return Some(Box::new(addr)); + } + } + None } } diff --git a/ntex-io/src/tokio_impl.rs b/ntex-io/src/tokio_impl.rs index e281a076..ca1c1d85 100644 --- a/ntex-io/src/tokio_impl.rs +++ b/ntex-io/src/tokio_impl.rs @@ -1,31 +1,45 @@ use std::task::{Context, Poll}; -use std::{cell::RefCell, cmp, future::Future, io, pin::Pin, rc::Rc}; +use std::{any, cell::RefCell, cmp, future::Future, io, pin::Pin, rc::Rc}; use ntex_bytes::{Buf, BufMut}; use ntex_util::time::{sleep, Sleep}; use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf}; use tok_io::net::TcpStream; -use super::{Filter, Io, IoStream, ReadContext, WriteContext, WriteReadiness}; +use super::{ + types, Filter, Handle, Io, IoStream, ReadContext, WriteContext, WriteReadiness, +}; impl IoStream for TcpStream { - fn start(self, read: ReadContext, write: WriteContext) { + fn start(self, read: ReadContext, write: WriteContext) -> Box { let io = Rc::new(RefCell::new(self)); ntex_util::spawn(ReadTask::new(io.clone(), read)); - ntex_util::spawn(WriteTask::new(io, write)); + ntex_util::spawn(WriteTask::new(io.clone(), write)); + Box::new(io) } } #[cfg(unix)] impl IoStream for tok_io::net::UnixStream { - fn start(self, _read: ReadContext, _write: WriteContext) { + fn start(self, _read: ReadContext, _write: WriteContext) -> Box { let _io = Rc::new(RefCell::new(self)); todo!() } } +impl Handle for Rc> { + fn query(&self, id: any::TypeId) -> Option> { + if id == any::TypeId::of::() { + if let Ok(addr) = self.borrow().peer_addr() { + return Some(Box::new(addr)); + } + } + None + } +} + /// Read io task struct ReadTask { io: Rc>, diff --git a/ntex-io/src/types.rs b/ntex-io/src/types.rs new file mode 100644 index 00000000..1539936e --- /dev/null +++ b/ntex-io/src/types.rs @@ -0,0 +1,38 @@ +use std::{any, fmt, marker::PhantomData, net::SocketAddr}; + +pub struct PeerAddr(pub SocketAddr); + +impl fmt::Debug for PeerAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +pub struct QueryItem { + item: Option>, + _t: PhantomData, +} + +impl QueryItem { + pub(crate) fn new(item: Box) -> Self { + Self { + item: Some(item), + _t: PhantomData, + } + } + + pub(crate) fn empty() -> Self { + Self { + item: None, + _t: PhantomData, + } + } + + pub fn get(&self) -> Option<&T> { + if let Some(ref item) = self.item { + item.downcast_ref() + } else { + None + } + } +} diff --git a/ntex/src/http/h1/service.rs b/ntex/src/http/h1/service.rs index d6888781..2976b524 100644 --- a/ntex/src/http/h1/service.rs +++ b/ntex/src/http/h1/service.rs @@ -9,7 +9,7 @@ use crate::http::error::{DispatchError, ResponseError}; use crate::http::helpers::DataFactory; use crate::http::request::Request; use crate::http::response::Response; -use crate::io::{DefaultFilter, Filter, Io, IoRef}; +use crate::io::{types, DefaultFilter, Filter, Io, IoRef}; use crate::service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use crate::{time::Millis, util::Pool}; @@ -367,6 +367,11 @@ where } fn call(&self, io: Self::Request) -> Self::Future { + log::trace!( + "New http1 connection, peer address {:?}", + io.query::().get() + ); + Dispatcher::new(io, self.config.clone()) } } diff --git a/ntex/src/http/h2/service.rs b/ntex/src/http/h2/service.rs index c80a761a..3001beee 100644 --- a/ntex/src/http/h2/service.rs +++ b/ntex/src/http/h2/service.rs @@ -10,8 +10,7 @@ use crate::http::error::{DispatchError, ResponseError}; use crate::http::helpers::DataFactory; use crate::http::request::Request; use crate::http::response::Response; -use crate::io::{Filter, Io, IoRef}; -use crate::rt::net::TcpStream; +use crate::io::{types, Filter, Io, IoRef}; use crate::service::{ fn_factory, fn_service, pipeline_factory, IntoServiceFactory, Service, ServiceFactory, @@ -216,7 +215,10 @@ where } fn call(&self, io: Self::Request) -> Self::Future { - // trace!("New http2 connection, peer address: {:?}", addr); + log::trace!( + "New http2 connection, peer address {:?}", + io.query::().get() + ); H2ServiceHandlerResponse { state: State::Handshake( diff --git a/ntex/src/http/message.rs b/ntex/src/http/message.rs index 0c3726f6..c07d6547 100644 --- a/ntex/src/http/message.rs +++ b/ntex/src/http/message.rs @@ -1,12 +1,10 @@ -use std::cell::{Ref, RefCell, RefMut}; -use std::net; -use std::rc::Rc; +use std::{cell::Ref, cell::RefCell, cell::RefMut, net, rc::Rc}; use bitflags::bitflags; use crate::http::header::HeaderMap; use crate::http::{header, Method, StatusCode, Uri, Version}; -use crate::io::IoRef; +use crate::io::{types, IoRef}; use crate::util::Extensions; /// Represents various types of connection @@ -174,9 +172,10 @@ impl RequestHead { /// ntex http server, then peer address would be address of this proxy. #[inline] pub fn peer_addr(&self) -> Option { - // TODO! fix - // self.head().peer_addr - None + self.io + .as_ref() + .map(|io| io.query::().get().map(|addr| addr.0)) + .unwrap_or(None) } } diff --git a/ntex/src/http/request.rs b/ntex/src/http/request.rs index ac4112d9..3b14bb69 100644 --- a/ntex/src/http/request.rs +++ b/ntex/src/http/request.rs @@ -1,12 +1,10 @@ use std::{cell::Ref, cell::RefMut, fmt, mem, net}; -use http::{header, Method, Uri, Version}; - -use crate::http::header::HeaderMap; +use crate::http::header::{self, HeaderMap}; use crate::http::httpmessage::HttpMessage; use crate::http::message::{Message, RequestHead}; -use crate::http::payload::Payload; -use crate::io::IoRef; +use crate::http::{payload::Payload, Method, Uri, Version}; +use crate::io::{types, IoRef}; use crate::util::Extensions; /// Request @@ -139,9 +137,11 @@ impl Request { /// ntex http server, then peer address would be address of this proxy. #[inline] pub fn peer_addr(&self) -> Option { - // TODO! fix - // self.head().peer_addr - None + self.head() + .io + .as_ref() + .map(|io| io.query::().get().map(|addr| addr.0)) + .unwrap_or(None) } /// Get request's payload diff --git a/ntex/src/http/service.rs b/ntex/src/http/service.rs index f2e6315e..098081f4 100644 --- a/ntex/src/http/service.rs +++ b/ntex/src/http/service.rs @@ -6,7 +6,7 @@ use std::{ use h2::server::{self, Handshake}; use crate::codec::{AsyncRead, AsyncWrite}; -use crate::io::{DefaultFilter, Filter, Io, IoRef}; +use crate::io::{types, DefaultFilter, Filter, Io, IoRef}; use crate::rt::net::TcpStream; use crate::service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use crate::time::{Millis, Seconds}; @@ -438,7 +438,10 @@ where } fn call(&self, io: Self::Request) -> Self::Future { - // log::trace!("New http connection protocol {:?}", proto); + log::trace!( + "New http connection, peer address {:?}", + io.query::().get() + ); //match proto { //Protocol::Http2 => todo!(), diff --git a/ntex/src/web/httprequest.rs b/ntex/src/web/httprequest.rs index a084d7bf..03b3e475 100644 --- a/ntex/src/web/httprequest.rs +++ b/ntex/src/web/httprequest.rs @@ -3,7 +3,7 @@ use std::{cell::Ref, cell::RefCell, cell::RefMut, fmt, net, rc::Rc}; use crate::http::{ HeaderMap, HttpMessage, Message, Method, Payload, RequestHead, Uri, Version, }; -use crate::io::IoRef; +use crate::io::{types, IoRef}; use crate::router::Path; use crate::util::{Extensions, Ready}; @@ -118,9 +118,9 @@ impl HttpRequest { /// ntex http server, then peer address would be address of this proxy. #[inline] pub fn peer_addr(&self) -> Option { - // TODO! fix - // self.head().peer_addr - None + self.io() + .map(|io| io.query::().get().map(|addr| addr.0)) + .unwrap_or(None) } /// Get a reference to the Path parameters. diff --git a/ntex/src/web/request.rs b/ntex/src/web/request.rs index a04b9f41..da2244c4 100644 --- a/ntex/src/web/request.rs +++ b/ntex/src/web/request.rs @@ -1,12 +1,9 @@ -use std::cell::{Ref, RefMut}; -use std::marker::PhantomData; -use std::rc::Rc; -use std::{fmt, net}; +use std::{cell::Ref, cell::RefMut, fmt, marker::PhantomData, net, rc::Rc}; use crate::http::{ header, HeaderMap, HttpMessage, Method, Payload, RequestHead, Response, Uri, Version, }; -use crate::io::IoRef; +use crate::io::{types, IoRef}; use crate::router::{Path, Resource}; use crate::util::Extensions; @@ -154,6 +151,21 @@ impl WebRequest { } } + /// Peer socket address + /// + /// Peer address is actual socket address, if proxy is used in front of + /// ntex http server, then peer address would be address of this proxy. + /// + /// To get client connection information `ConnectionInfo` should be used. + #[inline] + pub fn peer_addr(&self) -> Option { + self.head() + .io + .as_ref() + .map(|io| io.query::().get().map(|addr| addr.0)) + .unwrap_or(None) + } + /// Get *ConnectionInfo* for the current request. #[inline] pub fn connection_info(&self) -> Ref<'_, ConnectionInfo> {