add io query system

This commit is contained in:
Nikolay Kim 2021-12-15 22:38:15 +06:00
parent 399b238621
commit 3ed5580f86
13 changed files with 174 additions and 58 deletions

View file

@ -1,4 +1,4 @@
use std::{io, rc::Rc, task::Context, task::Poll}; use std::{any, io, rc::Rc, task::Context, task::Poll};
use ntex_bytes::BytesMut; use ntex_bytes::BytesMut;
@ -26,6 +26,16 @@ impl Filter for DefaultFilter {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
if let Some(hnd) = self.0.handle.take() {
let res = hnd.query(id);
self.0.handle.set(Some(hnd));
res
} else {
None
}
}
} }
impl ReadFilter for DefaultFilter { impl ReadFilter for DefaultFilter {
@ -140,6 +150,10 @@ impl Filter for NullFilter {
fn shutdown(&self, _: &IoRef) -> Poll<Result<(), io::Error>> { fn shutdown(&self, _: &IoRef) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn query(&self, _: any::TypeId) -> Option<Box<dyn any::Any>> {
None
}
} }
impl ReadFilter for NullFilter { impl ReadFilter for NullFilter {

View file

@ -7,6 +7,7 @@ mod filter;
mod state; mod state;
mod tasks; mod tasks;
mod time; mod time;
pub mod types;
mod utils; mod utils;
#[cfg(feature = "tokio")] #[cfg(feature = "tokio")]
@ -14,7 +15,7 @@ mod tokio_impl;
use ntex_bytes::BytesMut; use ntex_bytes::BytesMut;
use ntex_codec::{Decoder, Encoder}; use ntex_codec::{Decoder, Encoder};
use ntex_util::{channel::oneshot::Receiver, future::Either, time::Millis}; use ntex_util::time::Millis;
pub use self::dispatcher::Dispatcher; pub use self::dispatcher::Dispatcher;
pub use self::filter::DefaultFilter; pub use self::filter::DefaultFilter;
@ -58,12 +59,7 @@ pub trait WriteFilter {
pub trait Filter: ReadFilter + WriteFilter + 'static { pub trait Filter: ReadFilter + WriteFilter + 'static {
fn shutdown(&self, st: &IoRef) -> Poll<Result<(), io::Error>>; fn shutdown(&self, st: &IoRef) -> Poll<Result<(), io::Error>>;
fn query( fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
&self,
id: TypeId,
) -> Either<Option<Box<dyn Any>>, Receiver<Option<Box<dyn Any>>>> {
Either::Left(None)
}
} }
pub trait FilterFactory<F: Filter>: Sized { pub trait FilterFactory<F: Filter>: Sized {
@ -76,7 +72,11 @@ pub trait FilterFactory<F: Filter>: Sized {
} }
pub trait IoStream { pub trait IoStream {
fn start(self, _: ReadContext, _: WriteContext); fn start(self, _: ReadContext, _: WriteContext) -> Box<dyn Handle>;
}
pub trait Handle {
fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
} }
/// Framed transport item /// Framed transport item

View file

@ -1,6 +1,6 @@
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt, future::Future, hash, io, mem, ops::Deref, pin::Pin, ptr, rc::Rc}; use std::{any, fmt, future::Future, hash, io, mem, ops::Deref, pin::Pin, ptr, rc::Rc};
use ntex_bytes::{BytesMut, PoolId, PoolRef}; use ntex_bytes::{BytesMut, PoolId, PoolRef};
use ntex_codec::{Decoder, Encoder}; use ntex_codec::{Decoder, Encoder};
@ -9,7 +9,7 @@ use ntex_util::{future::poll_fn, future::Either, task::LocalWaker};
use super::filter::{DefaultFilter, NullFilter}; use super::filter::{DefaultFilter, NullFilter};
use super::tasks::{ReadContext, WriteContext}; use super::tasks::{ReadContext, WriteContext};
use super::{Filter, FilterFactory, IoStream}; use super::{types, Filter, FilterFactory, Handle, IoStream};
bitflags::bitflags! { bitflags::bitflags! {
pub struct Flags: u16 { pub struct Flags: u16 {
@ -66,6 +66,7 @@ pub(crate) struct IoStateInner {
pub(super) read_buf: Cell<Option<BytesMut>>, pub(super) read_buf: Cell<Option<BytesMut>>,
pub(super) write_buf: Cell<Option<BytesMut>>, pub(super) write_buf: Cell<Option<BytesMut>>,
pub(super) filter: Cell<&'static dyn Filter>, pub(super) filter: Cell<&'static dyn Filter>,
pub(super) handle: Cell<Option<Box<dyn Handle>>>,
on_disconnect: RefCell<Vec<Option<LocalWaker>>>, on_disconnect: RefCell<Vec<Option<LocalWaker>>>,
} }
@ -198,6 +199,7 @@ impl Io {
read_buf: Cell::new(None), read_buf: Cell::new(None),
write_buf: Cell::new(None), write_buf: Cell::new(None),
filter: Cell::new(NullFilter::get()), filter: Cell::new(NullFilter::get()),
handle: Cell::new(None),
on_disconnect: RefCell::new(Vec::new()), on_disconnect: RefCell::new(Vec::new()),
}); });
@ -211,7 +213,8 @@ impl Io {
let io_ref = IoRef(inner); let io_ref = IoRef(inner);
// start io tasks // start io tasks
io.start(ReadContext(io_ref.clone()), WriteContext(io_ref.clone())); let hnd = io.start(ReadContext(io_ref.clone()), WriteContext(io_ref.clone()));
io_ref.0.handle.set(Some(hnd));
Io(io_ref, FilterItem::Ptr(Box::into_raw(filter))) Io(io_ref, FilterItem::Ptr(Box::into_raw(filter)))
} }
@ -363,8 +366,12 @@ impl IoRef {
#[inline] #[inline]
/// Query specific data /// Query specific data
pub fn query<T: 'static>(&self) -> Option<T> { pub fn query<T: 'static>(&self) -> types::QueryItem<T> {
todo!() if let Some(item) = self.0.filter.get().query(any::TypeId::of::<T>()) {
types::QueryItem::new(item)
} else {
types::QueryItem::empty()
}
} }
} }
@ -853,18 +860,17 @@ impl<'a> ReadRef<'a> {
U: Decoder, U: Decoder,
{ {
let mut buf = self.0.read_buf.take(); let mut buf = self.0.read_buf.take();
let result = if let Some(ref mut buf) = buf { if let Some(ref mut b) = buf {
let result = codec.decode(buf); let result = codec.decode(b);
if result.as_ref().map(|v| v.is_none()).unwrap_or(false) { if result.as_ref().map(|v| v.is_none()).unwrap_or(false) {
self.0.remove_flags(Flags::RD_READY); self.0.remove_flags(Flags::RD_READY);
} }
self.0.read_buf.set(buf);
result result
} else { } else {
self.0.remove_flags(Flags::RD_READY); self.0.remove_flags(Flags::RD_READY);
Ok(None) Ok(None)
}; }
self.0.read_buf.set(buf);
result
} }
#[inline] #[inline]

View file

@ -1,13 +1,13 @@
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker}; use std::task::{Context, Poll, Waker};
use std::{cmp, fmt, future::Future, io, mem, pin::Pin, rc::Rc}; use std::{any, cmp, fmt, future::Future, io, mem, net, pin::Pin, rc::Rc};
use ntex_bytes::{Buf, BufMut, BytesMut}; use ntex_bytes::{Buf, BufMut, BytesMut};
use ntex_util::future::poll_fn; use ntex_util::future::poll_fn;
use ntex_util::time::{sleep, Millis, Sleep}; use ntex_util::time::{sleep, Millis, Sleep};
use crate::{IoStream, ReadContext, WriteContext, WriteReadiness}; use crate::{types, Handle, IoStream, ReadContext, WriteContext, WriteReadiness};
#[derive(Default)] #[derive(Default)]
struct AtomicWaker(Arc<Mutex<RefCell<Option<Waker>>>>); struct AtomicWaker(Arc<Mutex<RefCell<Option<Waker>>>>);
@ -30,6 +30,7 @@ impl fmt::Debug for AtomicWaker {
#[derive(Debug)] #[derive(Debug)]
pub struct IoTest { pub struct IoTest {
tp: Type, tp: Type,
peer_addr: Option<net::SocketAddr>,
state: Arc<Cell<State>>, state: Arc<Cell<State>>,
local: Arc<Mutex<RefCell<Channel>>>, local: Arc<Mutex<RefCell<Channel>>>,
remote: Arc<Mutex<RefCell<Channel>>>, remote: Arc<Mutex<RefCell<Channel>>>,
@ -102,12 +103,14 @@ impl IoTest {
( (
IoTest { IoTest {
tp: Type::Client, tp: Type::Client,
peer_addr: None,
local: local.clone(), local: local.clone(),
remote: remote.clone(), remote: remote.clone(),
state: state.clone(), state: state.clone(),
}, },
IoTest { IoTest {
state, state,
peer_addr: None,
tp: Type::Server, tp: Type::Server,
local: remote, local: remote,
remote: local, remote: local,
@ -128,6 +131,12 @@ impl IoTest {
self.remote.lock().unwrap().borrow().is_closed() self.remote.lock().unwrap().borrow().is_closed()
} }
/// Set peer addr
pub fn set_peer_addr(mut self, addr: net::SocketAddr) -> Self {
self.peer_addr = Some(addr);
self
}
/// Set read to Pending state /// Set read to Pending state
pub fn read_pending(&self) { pub fn read_pending(&self) {
self.remote.lock().unwrap().borrow_mut().read = IoState::Pending; self.remote.lock().unwrap().borrow_mut().read = IoState::Pending;
@ -317,6 +326,7 @@ impl Clone for IoTest {
local: self.local.clone(), local: self.local.clone(),
remote: self.remote.clone(), remote: self.remote.clone(),
state: self.state.clone(), state: self.state.clone(),
peer_addr: self.peer_addr,
} }
} }
} }
@ -444,7 +454,7 @@ mod tokio {
} }
impl IoStream for IoTest { impl IoStream for IoTest {
fn start(self, read: ReadContext, write: WriteContext) { fn start(self, read: ReadContext, write: WriteContext) -> Box<dyn Handle> {
let io = Rc::new(self); let io = Rc::new(self);
ntex_util::spawn(ReadTask { ntex_util::spawn(ReadTask {
@ -452,10 +462,23 @@ impl IoStream for IoTest {
state: read, state: read,
}); });
ntex_util::spawn(WriteTask { ntex_util::spawn(WriteTask {
io, io: io.clone(),
state: write, state: write,
st: IoWriteState::Processing(None), st: IoWriteState::Processing(None),
}); });
Box::new(io)
}
}
impl Handle for Rc<IoTest> {
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
if id == any::TypeId::of::<types::PeerAddr>() {
if let Some(addr) = self.peer_addr {
return Some(Box::new(addr));
}
}
None
} }
} }

View file

@ -1,31 +1,45 @@
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{cell::RefCell, cmp, future::Future, io, pin::Pin, rc::Rc}; use std::{any, cell::RefCell, cmp, future::Future, io, pin::Pin, rc::Rc};
use ntex_bytes::{Buf, BufMut}; use ntex_bytes::{Buf, BufMut};
use ntex_util::time::{sleep, Sleep}; use ntex_util::time::{sleep, Sleep};
use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf}; use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf};
use tok_io::net::TcpStream; use tok_io::net::TcpStream;
use super::{Filter, Io, IoStream, ReadContext, WriteContext, WriteReadiness}; use super::{
types, Filter, Handle, Io, IoStream, ReadContext, WriteContext, WriteReadiness,
};
impl IoStream for TcpStream { impl IoStream for TcpStream {
fn start(self, read: ReadContext, write: WriteContext) { fn start(self, read: ReadContext, write: WriteContext) -> Box<dyn Handle> {
let io = Rc::new(RefCell::new(self)); let io = Rc::new(RefCell::new(self));
ntex_util::spawn(ReadTask::new(io.clone(), read)); ntex_util::spawn(ReadTask::new(io.clone(), read));
ntex_util::spawn(WriteTask::new(io, write)); ntex_util::spawn(WriteTask::new(io.clone(), write));
Box::new(io)
} }
} }
#[cfg(unix)] #[cfg(unix)]
impl IoStream for tok_io::net::UnixStream { impl IoStream for tok_io::net::UnixStream {
fn start(self, _read: ReadContext, _write: WriteContext) { fn start(self, _read: ReadContext, _write: WriteContext) -> Box<dyn Handle> {
let _io = Rc::new(RefCell::new(self)); let _io = Rc::new(RefCell::new(self));
todo!() todo!()
} }
} }
impl Handle for Rc<RefCell<TcpStream>> {
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
if id == any::TypeId::of::<types::PeerAddr>() {
if let Ok(addr) = self.borrow().peer_addr() {
return Some(Box::new(addr));
}
}
None
}
}
/// Read io task /// Read io task
struct ReadTask { struct ReadTask {
io: Rc<RefCell<TcpStream>>, io: Rc<RefCell<TcpStream>>,

38
ntex-io/src/types.rs Normal file
View file

@ -0,0 +1,38 @@
use std::{any, fmt, marker::PhantomData, net::SocketAddr};
pub struct PeerAddr(pub SocketAddr);
impl fmt::Debug for PeerAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
pub struct QueryItem<T> {
item: Option<Box<dyn any::Any>>,
_t: PhantomData<T>,
}
impl<T: any::Any> QueryItem<T> {
pub(crate) fn new(item: Box<dyn any::Any>) -> Self {
Self {
item: Some(item),
_t: PhantomData,
}
}
pub(crate) fn empty() -> Self {
Self {
item: None,
_t: PhantomData,
}
}
pub fn get(&self) -> Option<&T> {
if let Some(ref item) = self.item {
item.downcast_ref()
} else {
None
}
}
}

View file

@ -9,7 +9,7 @@ use crate::http::error::{DispatchError, ResponseError};
use crate::http::helpers::DataFactory; use crate::http::helpers::DataFactory;
use crate::http::request::Request; use crate::http::request::Request;
use crate::http::response::Response; use crate::http::response::Response;
use crate::io::{DefaultFilter, Filter, Io, IoRef}; use crate::io::{types, DefaultFilter, Filter, Io, IoRef};
use crate::service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use crate::service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use crate::{time::Millis, util::Pool}; use crate::{time::Millis, util::Pool};
@ -367,6 +367,11 @@ where
} }
fn call(&self, io: Self::Request) -> Self::Future { fn call(&self, io: Self::Request) -> Self::Future {
log::trace!(
"New http1 connection, peer address {:?}",
io.query::<types::PeerAddr>().get()
);
Dispatcher::new(io, self.config.clone()) Dispatcher::new(io, self.config.clone())
} }
} }

View file

@ -10,8 +10,7 @@ use crate::http::error::{DispatchError, ResponseError};
use crate::http::helpers::DataFactory; use crate::http::helpers::DataFactory;
use crate::http::request::Request; use crate::http::request::Request;
use crate::http::response::Response; use crate::http::response::Response;
use crate::io::{Filter, Io, IoRef}; use crate::io::{types, Filter, Io, IoRef};
use crate::rt::net::TcpStream;
use crate::service::{ use crate::service::{
fn_factory, fn_service, pipeline_factory, IntoServiceFactory, Service, fn_factory, fn_service, pipeline_factory, IntoServiceFactory, Service,
ServiceFactory, ServiceFactory,
@ -216,7 +215,10 @@ where
} }
fn call(&self, io: Self::Request) -> Self::Future { fn call(&self, io: Self::Request) -> Self::Future {
// trace!("New http2 connection, peer address: {:?}", addr); log::trace!(
"New http2 connection, peer address {:?}",
io.query::<types::PeerAddr>().get()
);
H2ServiceHandlerResponse { H2ServiceHandlerResponse {
state: State::Handshake( state: State::Handshake(

View file

@ -1,12 +1,10 @@
use std::cell::{Ref, RefCell, RefMut}; use std::{cell::Ref, cell::RefCell, cell::RefMut, net, rc::Rc};
use std::net;
use std::rc::Rc;
use bitflags::bitflags; use bitflags::bitflags;
use crate::http::header::HeaderMap; use crate::http::header::HeaderMap;
use crate::http::{header, Method, StatusCode, Uri, Version}; use crate::http::{header, Method, StatusCode, Uri, Version};
use crate::io::IoRef; use crate::io::{types, IoRef};
use crate::util::Extensions; use crate::util::Extensions;
/// Represents various types of connection /// Represents various types of connection
@ -174,9 +172,10 @@ impl RequestHead {
/// ntex http server, then peer address would be address of this proxy. /// ntex http server, then peer address would be address of this proxy.
#[inline] #[inline]
pub fn peer_addr(&self) -> Option<net::SocketAddr> { pub fn peer_addr(&self) -> Option<net::SocketAddr> {
// TODO! fix self.io
// self.head().peer_addr .as_ref()
None .map(|io| io.query::<types::PeerAddr>().get().map(|addr| addr.0))
.unwrap_or(None)
} }
} }

View file

@ -1,12 +1,10 @@
use std::{cell::Ref, cell::RefMut, fmt, mem, net}; use std::{cell::Ref, cell::RefMut, fmt, mem, net};
use http::{header, Method, Uri, Version}; use crate::http::header::{self, HeaderMap};
use crate::http::header::HeaderMap;
use crate::http::httpmessage::HttpMessage; use crate::http::httpmessage::HttpMessage;
use crate::http::message::{Message, RequestHead}; use crate::http::message::{Message, RequestHead};
use crate::http::payload::Payload; use crate::http::{payload::Payload, Method, Uri, Version};
use crate::io::IoRef; use crate::io::{types, IoRef};
use crate::util::Extensions; use crate::util::Extensions;
/// Request /// Request
@ -139,9 +137,11 @@ impl Request {
/// ntex http server, then peer address would be address of this proxy. /// ntex http server, then peer address would be address of this proxy.
#[inline] #[inline]
pub fn peer_addr(&self) -> Option<net::SocketAddr> { pub fn peer_addr(&self) -> Option<net::SocketAddr> {
// TODO! fix self.head()
// self.head().peer_addr .io
None .as_ref()
.map(|io| io.query::<types::PeerAddr>().get().map(|addr| addr.0))
.unwrap_or(None)
} }
/// Get request's payload /// Get request's payload

View file

@ -6,7 +6,7 @@ use std::{
use h2::server::{self, Handshake}; use h2::server::{self, Handshake};
use crate::codec::{AsyncRead, AsyncWrite}; use crate::codec::{AsyncRead, AsyncWrite};
use crate::io::{DefaultFilter, Filter, Io, IoRef}; use crate::io::{types, DefaultFilter, Filter, Io, IoRef};
use crate::rt::net::TcpStream; use crate::rt::net::TcpStream;
use crate::service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use crate::service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use crate::time::{Millis, Seconds}; use crate::time::{Millis, Seconds};
@ -438,7 +438,10 @@ where
} }
fn call(&self, io: Self::Request) -> Self::Future { fn call(&self, io: Self::Request) -> Self::Future {
// log::trace!("New http connection protocol {:?}", proto); log::trace!(
"New http connection, peer address {:?}",
io.query::<types::PeerAddr>().get()
);
//match proto { //match proto {
//Protocol::Http2 => todo!(), //Protocol::Http2 => todo!(),

View file

@ -3,7 +3,7 @@ use std::{cell::Ref, cell::RefCell, cell::RefMut, fmt, net, rc::Rc};
use crate::http::{ use crate::http::{
HeaderMap, HttpMessage, Message, Method, Payload, RequestHead, Uri, Version, HeaderMap, HttpMessage, Message, Method, Payload, RequestHead, Uri, Version,
}; };
use crate::io::IoRef; use crate::io::{types, IoRef};
use crate::router::Path; use crate::router::Path;
use crate::util::{Extensions, Ready}; use crate::util::{Extensions, Ready};
@ -118,9 +118,9 @@ impl HttpRequest {
/// ntex http server, then peer address would be address of this proxy. /// ntex http server, then peer address would be address of this proxy.
#[inline] #[inline]
pub fn peer_addr(&self) -> Option<net::SocketAddr> { pub fn peer_addr(&self) -> Option<net::SocketAddr> {
// TODO! fix self.io()
// self.head().peer_addr .map(|io| io.query::<types::PeerAddr>().get().map(|addr| addr.0))
None .unwrap_or(None)
} }
/// Get a reference to the Path parameters. /// Get a reference to the Path parameters.

View file

@ -1,12 +1,9 @@
use std::cell::{Ref, RefMut}; use std::{cell::Ref, cell::RefMut, fmt, marker::PhantomData, net, rc::Rc};
use std::marker::PhantomData;
use std::rc::Rc;
use std::{fmt, net};
use crate::http::{ use crate::http::{
header, HeaderMap, HttpMessage, Method, Payload, RequestHead, Response, Uri, Version, header, HeaderMap, HttpMessage, Method, Payload, RequestHead, Response, Uri, Version,
}; };
use crate::io::IoRef; use crate::io::{types, IoRef};
use crate::router::{Path, Resource}; use crate::router::{Path, Resource};
use crate::util::Extensions; use crate::util::Extensions;
@ -154,6 +151,21 @@ impl<Err> WebRequest<Err> {
} }
} }
/// Peer socket address
///
/// Peer address is actual socket address, if proxy is used in front of
/// ntex http server, then peer address would be address of this proxy.
///
/// To get client connection information `ConnectionInfo` should be used.
#[inline]
pub fn peer_addr(&self) -> Option<net::SocketAddr> {
self.head()
.io
.as_ref()
.map(|io| io.query::<types::PeerAddr>().get().map(|addr| addr.0))
.unwrap_or(None)
}
/// Get *ConnectionInfo* for the current request. /// Get *ConnectionInfo* for the current request.
#[inline] #[inline]
pub fn connection_info(&self) -> Ref<'_, ConnectionInfo> { pub fn connection_info(&self) -> Ref<'_, ConnectionInfo> {