Drop deprecated ntex::framed mod

This commit is contained in:
Nikolay Kim 2020-09-06 11:15:31 +06:00
parent cea261ef20
commit a66efe4256
7 changed files with 4 additions and 605 deletions

View file

@ -1,5 +1,7 @@
# Changes
* Drop deprecated ntex::framed mod
## [0.1.23] - 2020-09-04
* Fix http1 pipeline requests with payload handling

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.1.23"
version = "0.1.24-dev"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"

View file

@ -1,187 +0,0 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Stream;
use crate::channel::mpsc::Receiver;
use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
pub struct Handshake<Io, Codec>
where
Codec: Encoder + Decoder,
{
io: Io,
_t: PhantomData<Codec>,
}
impl<Io, Codec> Handshake<Io, Codec>
where
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder,
{
pub(crate) fn new(io: Io) -> Self {
Self {
io,
_t: PhantomData,
}
}
pub fn codec(
self,
codec: Codec,
) -> HandshakeResult<Io, (), Codec, Receiver<<Codec as Encoder>::Item>> {
HandshakeResult {
state: (),
out: None,
framed: Framed::new(self.io, codec),
}
}
}
pin_project_lite::pin_project! {
pub struct HandshakeResult<Io, St, Codec, Out> {
pub(crate) state: St,
pub(crate) out: Option<Out>,
pub(crate) framed: Framed<Io, Codec>,
}
}
impl<Io, St, Codec: Encoder + Decoder, Out: Unpin> HandshakeResult<Io, St, Codec, Out> {
#[inline]
pub fn io(&mut self) -> &mut Framed<Io, Codec> {
&mut self.framed
}
pub fn out<U>(self, out: U) -> HandshakeResult<Io, St, Codec, U>
where
U: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
HandshakeResult {
state: self.state,
framed: self.framed,
out: Some(out),
}
}
#[inline]
pub fn state<S>(self, state: S) -> HandshakeResult<Io, S, Codec, Out> {
HandshakeResult {
state,
framed: self.framed,
out: self.out,
}
}
}
impl<Io, St, Codec, Out> Stream for HandshakeResult<Io, St, Codec, Out>
where
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder,
{
type Item = Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>;
#[inline]
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.framed.next_item(cx)
}
}
impl<Io, St, Codec, Out> futures::Sink<Codec::Item>
for HandshakeResult<Io, St, Codec, Out>
where
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Unpin,
Codec::Error: From<std::io::Error>,
{
type Error = Codec::Error;
#[inline]
fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.framed).poll_ready(cx)
}
#[inline]
fn start_send(
mut self: Pin<&mut Self>,
item: <Codec as Encoder>::Item,
) -> Result<(), Self::Error> {
Pin::new(&mut self.framed).start_send(item)
}
#[inline]
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.framed).poll_flush(cx)
}
#[inline]
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.framed).poll_close(cx)
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use futures::future::lazy;
use futures::{Sink, StreamExt};
use ntex_codec::BytesCodec;
use super::*;
use crate::testing::Io;
#[allow(clippy::declare_interior_mutable_const)]
const BLOB: Bytes = Bytes::from_static(b"GET /test HTTP/1.1\r\n\r\n");
#[ntex_rt::test]
async fn test_result() {
let (client, server) = Io::create();
client.remote_buffer_cap(1024);
let server = Framed::new(server, BytesCodec);
let mut hnd = HandshakeResult {
state: (),
out: Some(()),
framed: server,
};
client.write(BLOB);
let item = hnd.next().await.unwrap().unwrap();
assert_eq!(item, BLOB);
assert!(lazy(|cx| Pin::new(&mut hnd).poll_ready(cx))
.await
.is_ready());
Pin::new(&mut hnd).start_send(BLOB).unwrap();
assert_eq!(client.read_any(), b"".as_ref());
assert_eq!(hnd.io().read_buf(), b"".as_ref());
assert_eq!(hnd.io().write_buf(), &BLOB[..]);
assert!(lazy(|cx| Pin::new(&mut hnd).poll_flush(cx))
.await
.is_ready());
assert_eq!(client.read_any(), &BLOB[..]);
assert!(lazy(|cx| Pin::new(&mut hnd).poll_close(cx))
.await
.is_pending());
client.close().await;
assert!(lazy(|cx| Pin::new(&mut hnd).poll_close(cx))
.await
.is_ready());
assert!(client.is_closed());
}
}

View file

@ -1,9 +0,0 @@
mod handshake;
mod service;
pub use self::handshake::{Handshake, HandshakeResult};
pub use self::service::{Builder, FactoryBuilder};
pub use crate::util::framed::DispatcherError as ServiceError;
pub type Connect<T, U> = Handshake<T, U>;
pub type ConnectResult<Io, St, Codec, Out> = HandshakeResult<Io, St, Codec, Out>;

View file

@ -1,343 +0,0 @@
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use futures::{ready, Stream};
use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use crate::service::{IntoService, IntoServiceFactory, Service, ServiceFactory};
use crate::util::framed::Dispatcher;
use super::handshake::{Handshake, HandshakeResult};
use super::ServiceError;
type RequestItem<U> = <U as Decoder>::Item;
type ResponseItem<U> = Option<<U as Encoder>::Item>;
/// Service builder - structure that follows the builder pattern
/// for building instances for framed services.
pub struct Builder<St, C, Io, Codec, Out> {
connect: C,
disconnect_timeout: usize,
_t: PhantomData<(St, Io, Codec, Out)>,
}
impl<St, C, Io, Codec, Out> Builder<St, C, Io, Codec, Out>
where
C: Service<
Request = Handshake<Io, Codec>,
Response = HandshakeResult<Io, St, Codec, Out>,
>,
C::Error: fmt::Debug,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
/// Construct framed handler service with specified connect service
pub fn new<F>(connect: F) -> Builder<St, C, Io, Codec, Out>
where
F: IntoService<C>,
{
Builder {
connect: connect.into_service(),
disconnect_timeout: 3000,
_t: PhantomData,
}
}
/// Set connection disconnect timeout in milliseconds.
///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
/// within this time, the connection get dropped.
///
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 3 seconds.
pub fn disconnect_timeout(mut self, val: usize) -> Self {
self.disconnect_timeout = val;
self
}
/// Provide stream items handler service and construct service factory.
pub fn build<F, T>(self, service: F) -> FramedServiceImpl<St, C, T, Io, Codec, Out>
where
F: IntoServiceFactory<T>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
{
FramedServiceImpl {
connect: self.connect,
handler: Rc::new(service.into_factory()),
disconnect_timeout: self.disconnect_timeout,
_t: PhantomData,
}
}
}
/// Service builder - structure that follows the builder pattern
/// for building instances for framed services.
pub struct FactoryBuilder<St, C, Io, Codec, Out> {
connect: C,
disconnect_timeout: usize,
_t: PhantomData<(St, Io, Codec, Out)>,
}
impl<St, C, Io, Codec, Out> FactoryBuilder<St, C, Io, Codec, Out>
where
Io: AsyncRead + AsyncWrite + Unpin,
C: ServiceFactory<
Config = (),
Request = Handshake<Io, Codec>,
Response = HandshakeResult<Io, St, Codec, Out>,
>,
C::Error: fmt::Debug,
Codec: Decoder + Encoder,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
/// Construct framed handler service factory with specified connect service
pub fn new<F>(connect: F) -> FactoryBuilder<St, C, Io, Codec, Out>
where
F: IntoServiceFactory<C>,
{
FactoryBuilder {
connect: connect.into_factory(),
disconnect_timeout: 3000,
_t: PhantomData,
}
}
/// Set connection disconnect timeout in milliseconds.
///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
/// within this time, the connection get dropped.
///
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 3 seconds.
pub fn disconnect_timeout(mut self, val: usize) -> Self {
self.disconnect_timeout = val;
self
}
pub fn build<F, T, Cfg>(
self,
service: F,
) -> FramedService<St, C, T, Io, Codec, Out, Cfg>
where
F: IntoServiceFactory<T>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
{
FramedService {
connect: self.connect,
handler: Rc::new(service.into_factory()),
disconnect_timeout: self.disconnect_timeout,
_t: PhantomData,
}
}
}
pub struct FramedService<St, C, T, Io, Codec, Out, Cfg> {
connect: C,
handler: Rc<T>,
disconnect_timeout: usize,
_t: PhantomData<(St, Io, Codec, Out, Cfg)>,
}
impl<St, C, T, Io, Codec, Out, Cfg> ServiceFactory
for FramedService<St, C, T, Io, Codec, Out, Cfg>
where
Io: AsyncRead + AsyncWrite + Unpin,
C: ServiceFactory<
Config = (),
Request = Handshake<Io, Codec>,
Response = HandshakeResult<Io, St, Codec, Out>,
>,
C::Error: fmt::Debug,
<C::Service as Service>::Future: 'static,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
> + 'static,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
type Config = Cfg;
type Request = Io;
type Response = ();
type Error = ServiceError<C::Error, Codec>;
type InitError = C::InitError;
type Service = FramedServiceImpl<St, C::Service, T, Io, Codec, Out>;
type Future = FramedServiceResponse<St, C, T, Io, Codec, Out>;
fn new_service(&self, _: Cfg) -> Self::Future {
// create connect service and then create service impl
FramedServiceResponse {
fut: self.connect.new_service(()),
handler: self.handler.clone(),
disconnect_timeout: self.disconnect_timeout,
}
}
}
#[pin_project::pin_project]
pub struct FramedServiceResponse<St, C, T, Io, Codec, Out>
where
Io: AsyncRead + AsyncWrite + Unpin,
C: ServiceFactory<
Config = (),
Request = Handshake<Io, Codec>,
Response = HandshakeResult<Io, St, Codec, Out>,
>,
C::Error: fmt::Debug,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
#[pin]
fut: C::Future,
handler: Rc<T>,
disconnect_timeout: usize,
}
impl<St, C, T, Io, Codec, Out> Future for FramedServiceResponse<St, C, T, Io, Codec, Out>
where
Io: AsyncRead + AsyncWrite + Unpin,
C: ServiceFactory<
Config = (),
Request = Handshake<Io, Codec>,
Response = HandshakeResult<Io, St, Codec, Out>,
>,
C::Error: fmt::Debug,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
type Output =
Result<FramedServiceImpl<St, C::Service, T, Io, Codec, Out>, C::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let connect = ready!(this.fut.poll(cx))?;
Poll::Ready(Ok(FramedServiceImpl {
connect,
handler: this.handler.clone(),
disconnect_timeout: *this.disconnect_timeout,
_t: PhantomData,
}))
}
}
pub struct FramedServiceImpl<St, C, T, Io, Codec, Out> {
connect: C,
handler: Rc<T>,
disconnect_timeout: usize,
_t: PhantomData<(St, Io, Codec, Out)>,
}
impl<St, C, T, Io, Codec, Out> Service for FramedServiceImpl<St, C, T, Io, Codec, Out>
where
Io: AsyncRead + AsyncWrite + Unpin,
C: Service<
Request = Handshake<Io, Codec>,
Response = HandshakeResult<Io, St, Codec, Out>,
>,
C::Error: fmt::Debug,
C::Future: 'static,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
> + 'static,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
type Request = Io;
type Response = ();
type Error = ServiceError<C::Error, Codec>;
type Future = Pin<Box<dyn Future<Output = Result<(), Self::Error>>>>;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.connect.poll_ready(cx).map_err(|e| e.into())
}
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.connect.poll_shutdown(cx, is_error)
}
#[inline]
fn call(&self, req: Io) -> Self::Future {
log::trace!("Start connection handshake");
let handler = self.handler.clone();
let timeout = self.disconnect_timeout;
let handshake = self.connect.call(Handshake::new(req));
Box::pin(async move {
let result = handshake.await.map_err(|e| {
log::trace!("Connection handshake failed: {:?}", e);
e
})?;
log::trace!("Connection handshake succeeded");
let handler = handler.new_service(result.state).await?;
log::trace!("Connection handler is created, starting dispatcher");
Dispatcher::with(result.framed, result.out, handler)
.disconnect_timeout(timeout as u64)
.await
})
}
}

View file

@ -43,8 +43,4 @@ pub mod util;
pub mod web;
pub mod ws;
pub use self::service::*;
#[doc(hidden)]
#[deprecated(since = "0.1.20")]
pub mod framed;
pub use self::service::*;

View file

@ -1,60 +0,0 @@
#![allow(deprecated)]
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::time::Duration;
use bytes::{Bytes, BytesMut};
use futures::future::ok;
use ntex::channel::mpsc;
use ntex::codec::BytesCodec;
use ntex::framed::{Builder, FactoryBuilder, Handshake};
use ntex::rt::time::delay_for;
use ntex::server::test_server;
use ntex::{fn_factory_with_config, fn_service, IntoService, Service};
#[derive(Clone)]
struct State(Option<mpsc::Sender<Bytes>>);
#[ntex::test]
async fn test_basic() {
let client_item = Rc::new(Cell::new(false));
let srv = test_server(move || {
FactoryBuilder::new(fn_service(|conn: Handshake<_, _>| async move {
delay_for(Duration::from_millis(50)).await;
Ok(conn.codec(BytesCodec).state(State(None)))
}))
.disconnect_timeout(3000)
// echo
.build(fn_service(|t: BytesMut| ok(Some(t.freeze()))))
});
let item = client_item.clone();
let client = Builder::new(fn_service(move |conn: Handshake<_, _>| async move {
let (tx, rx) = mpsc::channel();
let _ = tx.send(Bytes::from_static(b"Hello"));
Ok(conn.codec(BytesCodec).out(rx).state(State(Some(tx))))
}))
.disconnect_timeout(3000)
.build(fn_factory_with_config(move |cfg: State| {
let item = item.clone();
let cfg = RefCell::new(cfg);
ok((move |t: BytesMut| {
assert_eq!(t.freeze(), Bytes::from_static(b"Hello"));
item.set(true);
// drop Sender, which will close connection
cfg.borrow_mut().0.take();
ok::<_, ()>(None)
})
.into_service())
}));
let conn = ntex::connect::Connector::default()
.call(ntex::connect::Connect::with(String::new(), srv.addr()))
.await
.unwrap();
client.call(conn).await.unwrap();
assert!(client_item.get());
}