Make IoBoxed into spearate type

This commit is contained in:
Nikolay Kim 2021-12-24 12:27:16 +06:00
parent b459a76df3
commit 093ddadfab
16 changed files with 114 additions and 86 deletions

View file

@ -1,6 +1,8 @@
# Changes
## [0.1.0-b.5] - 2021-12-23
## [0.1.0-b.5] - 2021-12-xx
* Make `IoBoxed` into spearate type
* Add `SealedService` and `SealedFactory` helpers

View file

@ -89,12 +89,16 @@ where
U: Decoder + Encoder + 'static,
{
/// Construct new `Dispatcher` instance.
pub fn new<F: IntoService<S, DispatchItem<U>>>(
io: IoBoxed,
pub fn new<Io, F: IntoService<S, DispatchItem<U>>>(
io: Io,
codec: U,
service: F,
timer: Timer,
) -> Self {
) -> Self
where
IoBoxed: From<Io>,
{
let io = IoBoxed::from(io);
let updated = now();
let ka_timeout = Seconds(30);
@ -551,7 +555,7 @@ mod tests {
ready_err: Cell::new(false),
st: Cell::new(DispatcherState::Processing),
pool: state.memory_pool().pool(),
io: state.seal(),
io: state.into(),
shared,
timer,
ka_timeout,

View file

@ -5,8 +5,6 @@ use ntex_bytes::BytesMut;
use super::io::Flags;
use super::{Filter, IoRef, ReadStatus, WriteStatus};
pub struct Sealed(pub(crate) Box<dyn Filter>);
pub struct Base(IoRef);
impl Base {

View file

@ -6,7 +6,8 @@ use ntex_bytes::{BytesMut, PoolId, PoolRef};
use ntex_codec::{Decoder, Encoder};
use ntex_util::{future::poll_fn, future::Either, task::LocalWaker, time::Millis};
use super::filter::{Base, NullFilter, Sealed};
use super::filter::{Base, NullFilter};
use super::seal::{IoBoxed, Sealed};
use super::tasks::{ReadContext, WriteContext};
use super::{Filter, FilterFactory, Handle, IoStream};
@ -332,6 +333,12 @@ impl<F> Io<F> {
}
}
impl Io<Sealed> {
pub fn boxed(self) -> IoBoxed {
self.into()
}
}
impl<F: Filter> Io<F> {
#[inline]
/// Get referece to filter
@ -346,7 +353,7 @@ impl<F: Filter> Io<F> {
#[inline]
/// Convert current io stream into sealed version
pub fn seal(mut self) -> crate::IoBoxed {
pub fn seal(mut self) -> Io<Sealed> {
// get current filter
let filter = unsafe {
let item = mem::replace(&mut self.1, FilterItem::Ptr(ptr::null_mut()));

View file

@ -10,6 +10,7 @@ mod dispatcher;
mod filter;
mod io;
mod ioref;
mod seal;
mod tasks;
mod time;
mod utils;
@ -24,14 +25,12 @@ use ntex_codec::{Decoder, Encoder};
use ntex_util::time::Millis;
pub use self::dispatcher::Dispatcher;
pub use self::filter::{Base, Sealed};
pub use self::filter::Base;
pub use self::io::{Io, IoRef, OnDisconnect};
pub use self::seal::{IoBoxed, Sealed};
pub use self::tasks::{ReadContext, WriteContext};
pub use self::time::Timer;
pub use self::utils::{filter_factory, seal, sealed_service, SealedFactory, SealedService};
pub type IoBoxed = Io<Sealed>;
pub use self::utils::{add_filter, boxed, seal, Boxed, BoxedFactory};
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum ReadStatus {

28
ntex-io/src/seal.rs Normal file
View file

@ -0,0 +1,28 @@
use std::ops;
use crate::{Filter, Io};
pub struct Sealed(pub(crate) Box<dyn Filter>);
pub struct IoBoxed(Io<Sealed>);
impl From<Io<Sealed>> for IoBoxed {
fn from(io: Io<Sealed>) -> Self {
Self(io)
}
}
impl<F: Filter> From<Io<F>> for IoBoxed {
fn from(io: Io<F>) -> Self {
Self(io.seal())
}
}
impl ops::Deref for IoBoxed {
type Target = Io<Sealed>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}

View file

@ -423,11 +423,11 @@ impl AsyncWrite for IoBoxed {
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Self::poll_flush(&*self, cx, false)
(&*self.as_ref()).poll_flush(cx, false)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Self::poll_shutdown(&*self, cx)
(&*self.as_ref()).poll_shutdown(cx)
}
}

View file

@ -23,25 +23,25 @@ where
let fut = srv.new_service(cfg);
async move {
let srv = fut.await?;
Ok(into_service(move |io: Io<F>| srv.call(io.seal())))
Ok(into_service(move |io: Io<F>| srv.call(IoBoxed::from(io))))
}
})
}
/// Service that converts Io<F> responses from service to the IoBoxed
pub fn sealed_service<S, R, F>(inner: S) -> SealedService<S, R>
pub fn boxed<S, R, F>(inner: S) -> Boxed<S, R>
where
F: Filter,
S: Service<R, Response = Io<F>>,
{
SealedService {
Boxed {
inner,
_t: PhantomData,
}
}
/// Create filter factory service
pub fn filter_factory<T, F>(filter: T) -> FilterServiceFactory<T, F>
pub fn add_filter<T, F>(filter: T) -> FilterServiceFactory<T, F>
where
T: FilterFactory<F> + Clone,
F: Filter,
@ -52,12 +52,12 @@ where
}
}
pub struct SealedFactory<S, R> {
pub struct BoxedFactory<S, R> {
inner: S,
_t: PhantomData<R>,
}
impl<S, R> SealedFactory<S, R> {
impl<S, R> BoxedFactory<S, R> {
pub fn new(inner: S) -> Self {
Self {
inner,
@ -66,37 +66,37 @@ impl<S, R> SealedFactory<S, R> {
}
}
impl<S: Clone, R> Clone for SealedFactory<S, R> {
impl<S: Clone, R> Clone for BoxedFactory<S, R> {
fn clone(&self) -> Self {
Self::new(self.inner.clone())
}
}
impl<S, R, C, F> ServiceFactory<R, C> for SealedFactory<S, R>
impl<S, R, C, F> ServiceFactory<R, C> for BoxedFactory<S, R>
where
F: Filter,
S: ServiceFactory<R, C, Response = Io<F>>,
{
type Response = IoBoxed;
type Error = S::Error;
type Service = SealedService<S::Service, R>;
type Service = Boxed<S::Service, R>;
type InitError = S::InitError;
type Future = SealedFactoryResponse<S, R, C>;
type Future = BoxedFactoryResponse<S, R, C>;
fn new_service(&self, cfg: C) -> Self::Future {
SealedFactoryResponse {
BoxedFactoryResponse {
fut: self.inner.new_service(cfg),
_t: PhantomData,
}
}
}
pub struct SealedService<S, R> {
pub struct Boxed<S, R> {
inner: S,
_t: PhantomData<R>,
}
impl<S, R> SealedService<S, R> {
impl<S, R> Boxed<S, R> {
pub fn new(inner: S) -> Self {
Self {
inner,
@ -105,20 +105,20 @@ impl<S, R> SealedService<S, R> {
}
}
impl<S: Clone, R> Clone for SealedService<S, R> {
impl<S: Clone, R> Clone for Boxed<S, R> {
fn clone(&self) -> Self {
Self::new(self.inner.clone())
}
}
impl<S, R, F> Service<R> for SealedService<S, R>
impl<S, R, F> Service<R> for Boxed<S, R>
where
F: Filter,
S: Service<R, Response = Io<F>>,
{
type Response = IoBoxed;
type Error = S::Error;
type Future = SealedServiceResponse<S, R>;
type Future = BoxedResponse<S, R>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
self.inner.poll_ready(cx)
@ -130,45 +130,43 @@ where
}
fn call(&self, req: R) -> Self::Future {
SealedServiceResponse {
BoxedResponse {
fut: self.inner.call(req),
}
}
}
pin_project_lite::pin_project! {
pub struct SealedFactoryResponse<S: ServiceFactory<R, C>, R, C> {
pub struct BoxedFactoryResponse<S: ServiceFactory<R, C>, R, C> {
#[pin]
fut: S::Future,
_t: PhantomData<(R, C)>
}
}
impl<S: ServiceFactory<R, C>, R, C> Future for SealedFactoryResponse<S, R, C> {
type Output = Result<SealedService<S::Service, R>, S::InitError>;
impl<S: ServiceFactory<R, C>, R, C> Future for BoxedFactoryResponse<S, R, C> {
type Output = Result<Boxed<S::Service, R>, S::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(
ready!(self.project().fut.poll(cx)).map(|inner| SealedService {
inner,
_t: PhantomData,
}),
)
Poll::Ready(ready!(self.project().fut.poll(cx)).map(|inner| Boxed {
inner,
_t: PhantomData,
}))
}
}
pin_project_lite::pin_project! {
pub struct SealedServiceResponse<S: Service<R>, R> {
pub struct BoxedResponse<S: Service<R>, R> {
#[pin]
fut: S::Future,
}
}
impl<S: Service<R, Response = Io<F>>, R, F: Filter> Future for SealedServiceResponse<S, R> {
impl<S: Service<R, Response = Io<F>>, R, F: Filter> Future for BoxedResponse<S, R> {
type Output = Result<IoBoxed, S::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(ready!(self.project().fut.poll(cx)).map(|io| io.seal()))
Poll::Ready(ready!(self.project().fut.poll(cx)).map(IoBoxed::from))
}
}

View file

@ -1,7 +1,7 @@
use std::{fs::File, io, io::BufReader, sync::Arc};
use ntex::service::{fn_service, pipeline_factory};
use ntex::{codec, io::filter_factory, io::Io, server, util::Either};
use ntex::{codec, io::add_filter, io::Io, server, util::Either};
use ntex_tls::rustls::TlsAcceptor;
use rustls_pemfile::{certs, rsa_private_keys};
use tls_rust::{Certificate, PrivateKey, ServerConfig};
@ -35,7 +35,7 @@ async fn main() -> io::Result<()> {
// start server
server::ServerBuilder::new()
.bind("basic", "127.0.0.1:8443", move |_| {
pipeline_factory(filter_factory(TlsAcceptor::new(tls_config.clone()))).and_then(
pipeline_factory(add_filter(TlsAcceptor::new(tls_config.clone()))).and_then(
fn_service(|io: Io<_>| async move {
println!("New client is connected");

View file

@ -1,7 +1,7 @@
use std::io;
use ntex::service::{fn_service, pipeline_factory};
use ntex::{codec, io::filter_factory, io::Io, server, util::Either};
use ntex::{codec, io::add_filter, io::Io, server, util::Either};
use ntex_tls::openssl::SslAcceptor;
use tls_openssl::ssl::{self, SslFiletype, SslMethod};
@ -25,7 +25,7 @@ async fn main() -> io::Result<()> {
// start server
server::ServerBuilder::new()
.bind("basic", "127.0.0.1:8443", move |_| {
pipeline_factory(filter_factory(SslAcceptor::new(acceptor.clone()))).and_then(
pipeline_factory(add_filter(SslAcceptor::new(acceptor.clone()))).and_then(
fn_service(|io: Io<_>| async move {
println!("New client is connected");
loop {

View file

@ -1,7 +1,7 @@
use std::task::{Context, Poll};
use std::{error::Error, future::Future, marker::PhantomData, pin::Pin};
use ntex_io::{Filter, FilterFactory, Io, SealedFactory};
use ntex_io::{BoxedFactory, Filter, FilterFactory, Io};
use ntex_service::{Service, ServiceFactory};
use ntex_util::{future::Ready, time::Millis};
use tls_openssl::ssl::SslAcceptor;
@ -36,8 +36,8 @@ impl<F> Acceptor<F> {
self
}
pub fn seal(self) -> SealedFactory<Acceptor<F>, Io<F>> {
SealedFactory::new(self)
pub fn seal(self) -> BoxedFactory<Acceptor<F>, Io<F>> {
BoxedFactory::new(self)
}
}

View file

@ -5,7 +5,7 @@ pub use tls_openssl::ssl::{Error as SslError, HandshakeError, SslConnector, SslM
use ntex_tls::openssl::SslConnector as IoSslConnector;
use crate::io::{Base, Io, SealedService};
use crate::io::{Base, Boxed, Io};
use crate::service::{Service, ServiceFactory};
use crate::util::{PoolId, Ready};
@ -78,8 +78,8 @@ impl<T: Address + 'static> Connector<T> {
}
/// Produce sealed io stream (IoBoxed)
pub fn seal(self) -> SealedService<Connector<T>, Connect<T>> {
SealedService::new(self)
pub fn seal(self) -> Boxed<Connector<T>, Connect<T>> {
Boxed::new(self)
}
}

View file

@ -5,7 +5,7 @@ pub use tls_rustls::{ClientConfig, ServerName};
use ntex_tls::rustls::TlsConnector;
use crate::io::{Base, Io, SealedService};
use crate::io::{Base, Boxed, Io};
use crate::service::{Service, ServiceFactory};
use crate::util::{PoolId, Ready};
@ -82,8 +82,8 @@ impl<T: Address + 'static> Connector<T> {
}
/// Produce sealed io stream (IoBoxed)
pub fn seal(self) -> SealedService<Connector<T>, Connect<T>> {
SealedService::new(self)
pub fn seal(self) -> Boxed<Connector<T>, Connect<T>> {
Boxed::new(self)
}
}

View file

@ -1,7 +1,7 @@
use std::task::{Context, Poll};
use std::{collections::VecDeque, future::Future, io, net::SocketAddr, pin::Pin};
use crate::io::{types, Io, SealedService};
use crate::io::{types, Boxed, Io};
use crate::rt::tcp_connect_in;
use crate::service::{Service, ServiceFactory};
use crate::util::{Either, PoolId, PoolRef, Ready};
@ -45,8 +45,8 @@ impl<T: Address> Connector<T> {
}
/// Produce sealed io stream (IoBoxed)
pub fn seal(self) -> SealedService<Connector<T>, Io> {
SealedService::new(self)
pub fn seal(self) -> Boxed<Connector<T>, Io> {
Boxed::new(self)
}
}

View file

@ -2,7 +2,7 @@ use std::{rc::Rc, task::Context, task::Poll, time::Duration};
use crate::connect::{Connect as TcpConnect, Connector as TcpConnector};
use crate::http::Uri;
use crate::io::{Filter, Io, IoBoxed};
use crate::io::IoBoxed;
use crate::service::{apply_fn, boxed, Service};
use crate::time::{Millis, Seconds};
use crate::util::timeout::{TimeoutError, TimeoutService};
@ -55,7 +55,7 @@ impl Connector {
let conn = Connector {
connector: boxed::service(
TcpConnector::new()
.map(|io| io.seal())
.map(IoBoxed::from)
.map_err(ConnectError::from),
),
ssl_connector: None,
@ -175,43 +175,35 @@ impl Connector {
}
/// Use custom connector to open un-secured connections.
pub fn connector<T, F>(mut self, connector: T) -> Self
pub fn connector<Io, T>(mut self, connector: T) -> Self
where
T: Service<TcpConnect<Uri>, Response = Io<F>, Error = crate::connect::ConnectError>
T: Service<TcpConnect<Uri>, Response = Io, Error = crate::connect::ConnectError>
+ 'static,
F: Filter,
IoBoxed: From<Io>,
{
self.connector =
boxed::service(connector.map(|io| io.seal()).map_err(ConnectError::from));
self.connector = boxed::service(
connector
.map(|io| IoBoxed::from(io))
.map_err(ConnectError::from),
);
self
}
/// Use custom connector to open secure connections.
pub fn secure_connector<T, F>(mut self, connector: T) -> Self
pub fn secure_connector<Io, T>(mut self, connector: T) -> Self
where
T: Service<TcpConnect<Uri>, Response = Io<F>, Error = crate::connect::ConnectError>
T: Service<TcpConnect<Uri>, Response = Io, Error = crate::connect::ConnectError>
+ 'static,
F: Filter,
IoBoxed: From<Io>,
{
self.ssl_connector = Some(boxed::service(
connector.map(|io| io.seal()).map_err(ConnectError::from),
connector
.map(|io| IoBoxed::from(io))
.map_err(ConnectError::from),
));
self
}
/// Use custom boxed connector to open secure connections.
pub fn boxed_secure_connector<T>(mut self, connector: T) -> Self
where
T: Service<
TcpConnect<Uri>,
Response = IoBoxed,
Error = crate::connect::ConnectError,
> + 'static,
{
self.ssl_connector = Some(boxed::service(connector.map_err(ConnectError::from)));
self
}
/// Finish configuration process and create connector service.
/// The Connector builder always concludes by calling `finish()` last in
/// its combinator chain.

View file

@ -532,7 +532,7 @@ mod tests {
fn_service(move |req| {
let (client, server) = Io::create();
store2.borrow_mut().push((req, server));
Box::pin(async move { Ok(nio::Io::new(client).seal()) })
Box::pin(async move { Ok(IoBoxed::from(nio::Io::new(client))) })
}),
Duration::from_secs(10),
Duration::from_secs(10),