reduce futures crate dependencies

This commit is contained in:
Nikolay Kim 2021-04-03 13:25:20 +06:00
parent d56309c64f
commit cb9e3ffeda
92 changed files with 1004 additions and 614 deletions

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-codec"
version = "0.4.0"
version = "0.4.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@ -18,12 +18,12 @@ path = "src/lib.rs"
[dependencies]
bitflags = "1.2.1"
bytes = "1.0"
either = "1.6.1"
futures-core = "0.3.12"
futures-sink = "0.3.12"
ntex-service = "0.1.7"
futures-core = { version = "0.3.13", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.13", default-features = false, features = ["alloc"] }
log = "0.4"
tokio = { version = "1", default-features=false }
[dev-dependencies]
ntex = "0.3.0-b.1"
ntex = "0.3.13"
futures = "0.3.13"

View file

@ -3,9 +3,9 @@ use std::task::{Context, Poll};
use std::{fmt, io};
use bytes::{Buf, BytesMut};
use either::Either;
use futures_core::{ready, Stream};
use futures_sink::Sink;
use ntex_service::util::Either;
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder};

View file

@ -1,5 +1,9 @@
# Changes
## [0.2.2] - 2021-04-03
* precise futures crate dependency
## [0.2.1] - 2021-02-25
* Drop macros

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-rt"
version = "0.2.1"
version = "0.2.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"]
@ -16,5 +16,5 @@ name = "ntex_rt"
path = "src/lib.rs"
[dependencies]
futures = "0.3.13"
tokio = { version = "1", default-features=false, features = ["rt", "net", "time", "signal"] }
ntex-service = "0.1.7"
tokio = { version = "1", default-features=false, features = ["rt", "net", "time", "signal", "sync"] }

View file

@ -1,14 +1,10 @@
use std::any::{Any, TypeId};
use std::cell::RefCell;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::{fmt, thread};
use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread};
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot::{channel, Canceled, Sender};
use futures::{Future, Stream};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::{channel, error::RecvError, Sender};
use tokio::task::LocalSet;
use super::runtime::Runtime;
@ -56,7 +52,7 @@ impl Default for Arbiter {
impl Arbiter {
pub(super) fn new_system(local: &LocalSet) -> Self {
let (tx, rx) = unbounded();
let (tx, rx) = unbounded_channel();
let arb = Arbiter::with_sender(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
@ -78,7 +74,7 @@ impl Arbiter {
/// Stop arbiter from continuing it's event loop.
pub fn stop(&self) {
let _ = self.sender.unbounded_send(ArbiterCommand::Stop);
let _ = self.sender.send(ArbiterCommand::Stop);
}
/// Spawn new thread and run event loop in spawned thread.
@ -87,13 +83,13 @@ impl Arbiter {
let id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("ntex-rt:worker:{}", id);
let sys = System::current();
let (arb_tx, arb_rx) = unbounded();
let (arb_tx, arb_rx) = unbounded_channel();
let arb_tx2 = arb_tx.clone();
let handle = thread::Builder::new()
.name(name.clone())
.spawn(move || {
let rt = Runtime::new().expect("Can not create Runtime");
let rt = Runtime::new().expect("Cannot create Runtime");
let arb = Arbiter::with_sender(arb_tx);
let (stop, stop_rx) = channel();
@ -111,7 +107,7 @@ impl Arbiter {
// register arbiter
let _ = System::current()
.sys()
.unbounded_send(SystemCommand::RegisterArbiter(id, arb));
.send(SystemCommand::RegisterArbiter(id, arb));
// run loop
let _ = rt.block_on(stop_rx);
@ -119,7 +115,7 @@ impl Arbiter {
// unregister arbiter
let _ = System::current()
.sys()
.unbounded_send(SystemCommand::UnregisterArbiter(id));
.send(SystemCommand::UnregisterArbiter(id));
})
.unwrap_or_else(|err| {
panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
@ -136,15 +132,13 @@ impl Arbiter {
where
F: Future<Output = ()> + Send + Unpin + 'static,
{
let _ = self
.sender
.unbounded_send(ArbiterCommand::Execute(Box::new(future)));
let _ = self.sender.send(ArbiterCommand::Execute(Box::new(future)));
}
/// Send a function to the Arbiter's thread. This function will be executed asynchronously.
/// A future is created, and when resolved will contain the result of the function sent
/// to the Arbiters thread.
pub fn exec<F, R>(&self, f: F) -> impl Future<Output = Result<R, Canceled>>
pub fn exec<F, R>(&self, f: F) -> impl Future<Output = Result<R, RecvError>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
@ -152,8 +146,8 @@ impl Arbiter {
let (tx, rx) = channel();
let _ = self
.sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
if !tx.is_canceled() {
.send(ArbiterCommand::ExecuteFn(Box::new(move || {
if !tx.is_closed() {
let _ = tx.send(f());
}
})));
@ -168,7 +162,7 @@ impl Arbiter {
{
let _ = self
.sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
.send(ArbiterCommand::ExecuteFn(Box::new(move || {
f();
})));
}
@ -261,7 +255,7 @@ impl Future for ArbiterController {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.rx).poll_next(cx) {
match Pin::new(&mut self.rx).poll_recv(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(item)) => match item {
ArbiterCommand::Stop => {
@ -315,7 +309,7 @@ impl Future for SystemArbiter {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.commands).poll_next(cx) {
match Pin::new(&mut self.commands).poll_recv(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(cmd)) => match cmd {
SystemCommand::Exit(code) => {

View file

@ -1,9 +1,8 @@
use std::borrow::Cow;
use std::io;
use std::{borrow::Cow, future::Future, io};
use futures::channel::mpsc::unbounded;
use futures::channel::oneshot::{channel, Receiver};
use futures::future::{lazy, Future, FutureExt};
use ntex_service::util::lazy;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::oneshot::{channel, Receiver};
use tokio::task::LocalSet;
use super::arbiter::{Arbiter, SystemArbiter};
@ -74,7 +73,7 @@ impl Builder {
fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner {
let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded();
let (sys_sender, sys_receiver) = unbounded_channel();
let system = System::construct(
sys_sender,
@ -96,7 +95,7 @@ impl Builder {
F: FnOnce() + 'static,
{
let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded();
let (sys_sender, sys_receiver) = unbounded_channel();
let rt = Runtime::new().unwrap();
@ -129,7 +128,7 @@ impl AsyncSystemRunner {
let AsyncSystemRunner { stop, .. } = self;
// run loop
lazy(|_| async {
async move {
match stop.await {
Ok(code) => {
if code != 0 {
@ -143,8 +142,7 @@ impl AsyncSystemRunner {
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}
})
.flatten()
}
}
}

View file

@ -1,5 +1,6 @@
//! A runtime implementation that runs everything on the current thread.
use futures::future::{self, Future, FutureExt};
use ntex_service::util::lazy;
use std::future::Future;
mod arbiter;
mod builder;
@ -21,7 +22,7 @@ pub use self::system::System;
#[inline]
pub fn spawn<F>(f: F) -> self::task::JoinHandle<F::Output>
where
F: futures::Future + 'static,
F: Future + 'static,
{
tokio::task::spawn_local(f)
}
@ -39,7 +40,10 @@ where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
tokio::task::spawn_local(future::lazy(|_| f()).flatten())
tokio::task::spawn_local(async move {
let r = lazy(|_| f()).await;
r.await
})
}
/// Asynchronous signal handling

View file

@ -1,8 +1,6 @@
use std::cell::RefCell;
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::channel::mpsc::UnboundedSender;
use std::{cell::RefCell, io};
use tokio::sync::mpsc::UnboundedSender;
use super::arbiter::{Arbiter, SystemCommand};
use super::builder::{Builder, SystemRunner};
@ -83,7 +81,7 @@ impl System {
/// Stop the system with a particular exit code.
pub fn stop_with_code(&self, code: i32) {
let _ = self.sys.unbounded_send(SystemCommand::Exit(code));
let _ = self.sys.send(SystemCommand::Exit(code));
}
pub(super) fn sys(&self) -> &UnboundedSender<SystemCommand> {

208
ntex-service/src/either.rs Normal file
View file

@ -0,0 +1,208 @@
use std::{error, fmt, future::Future, pin::Pin, task::Context, task::Poll};
/// Combines two different futures, streams, or sinks having the same associated types into a single
/// type.
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub enum Either<A, B> {
/// First branch of the type
Left(/* #[pin] */ A),
/// Second branch of the type
Right(/* #[pin] */ B),
}
impl<A, B> Either<A, B> {
fn project(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> {
unsafe {
match self.get_unchecked_mut() {
Either::Left(a) => Either::Left(Pin::new_unchecked(a)),
Either::Right(b) => Either::Right(Pin::new_unchecked(b)),
}
}
}
#[inline]
/// Return true if the value is the `Left` variant.
///
/// ```
/// use either::*;
///
/// let values = [Left(1), Right("the right value")];
/// assert_eq!(values[0].is_left(), true);
/// assert_eq!(values[1].is_left(), false);
/// ```
pub fn is_left(&self) -> bool {
match *self {
Either::Left(_) => true,
Either::Right(_) => false,
}
}
#[inline]
/// Return true if the value is the `Right` variant.
///
/// ```
/// use either::*;
///
/// let values = [Left(1), Right("the right value")];
/// assert_eq!(values[0].is_right(), false);
/// assert_eq!(values[1].is_right(), true);
/// ```
pub fn is_right(&self) -> bool {
!self.is_left()
}
#[inline]
/// Convert the left side of `Either<L, R>` to an `Option<L>`.
///
/// ```
/// use either::*;
///
/// let left: Either<_, ()> = Left("some value");
/// assert_eq!(left.left(), Some("some value"));
///
/// let right: Either<(), _> = Right(321);
/// assert_eq!(right.left(), None);
/// ```
pub fn left(self) -> Option<A> {
match self {
Either::Left(l) => Some(l),
Either::Right(_) => None,
}
}
#[inline]
/// Convert the right side of `Either<L, R>` to an `Option<R>`.
///
/// ```
/// use either::*;
///
/// let left: Either<_, ()> = Left("some value");
/// assert_eq!(left.right(), None);
///
/// let right: Either<(), _> = Right(321);
/// assert_eq!(right.right(), Some(321));
/// ```
pub fn right(self) -> Option<B> {
match self {
Either::Left(_) => None,
Either::Right(r) => Some(r),
}
}
/// Convert `&Either<L, R>` to `Either<&L, &R>`.
///
/// ```
/// use either::*;
///
/// let left: Either<_, ()> = Left("some value");
/// assert_eq!(left.as_ref(), Left(&"some value"));
///
/// let right: Either<(), _> = Right("some value");
/// assert_eq!(right.as_ref(), Right(&"some value"));
/// ```
pub fn as_ref(&self) -> Either<&A, &B> {
match *self {
Either::Left(ref inner) => Either::Left(inner),
Either::Right(ref inner) => Either::Right(inner),
}
}
/// Convert `&mut Either<L, R>` to `Either<&mut L, &mut R>`.
///
/// ```
/// use either::*;
///
/// fn mutate_left(value: &mut Either<u32, u32>) {
/// if let Some(l) = value.as_mut().left() {
/// *l = 999;
/// }
/// }
///
/// let mut left = Left(123);
/// let mut right = Right(123);
/// mutate_left(&mut left);
/// mutate_left(&mut right);
/// assert_eq!(left, Left(999));
/// assert_eq!(right, Right(123));
/// ```
pub fn as_mut(&mut self) -> Either<&mut A, &mut B> {
match *self {
Either::Left(ref mut inner) => Either::Left(inner),
Either::Right(ref mut inner) => Either::Right(inner),
}
}
}
impl<A, B, T> Either<(T, A), (T, B)> {
#[inline]
/// Factor out a homogeneous type from an either of pairs.
///
/// Here, the homogeneous type is the first element of the pairs.
pub fn factor_first(self) -> (T, Either<A, B>) {
match self {
Either::Left((x, a)) => (x, Either::Left(a)),
Either::Right((x, b)) => (x, Either::Right(b)),
}
}
}
impl<A, B, T> Either<(A, T), (B, T)> {
#[inline]
/// Factor out a homogeneous type from an either of pairs.
///
/// Here, the homogeneous type is the second element of the pairs.
pub fn factor_second(self) -> (Either<A, B>, T) {
match self {
Either::Left((a, x)) => (Either::Left(a), x),
Either::Right((b, x)) => (Either::Right(b), x),
}
}
}
impl<T> Either<T, T> {
#[inline]
/// Extract the value of an either over two equivalent types.
pub fn into_inner(self) -> T {
match self {
Either::Left(x) => x,
Either::Right(x) => x,
}
}
}
/// `Either` implements `Error` if *both* `A` and `B` implement it.
impl<A, B> error::Error for Either<A, B>
where
A: error::Error,
B: error::Error,
{
}
impl<A, B> fmt::Display for Either<A, B>
where
A: fmt::Display,
B: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Either::Left(a) => a.fmt(f),
Either::Right(b) => b.fmt(f),
}
}
}
impl<A, B> Future for Either<A, B>
where
A: Future,
B: Future<Output = A::Output>,
{
type Output = A::Output;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
Either::Left(x) => x.poll(cx),
Either::Right(x) => x.poll(cx),
}
}
}

View file

@ -21,6 +21,7 @@ mod then;
mod transform;
mod transform_err;
mod either;
mod lazy;
mod ready;
@ -341,6 +342,7 @@ where
}
pub mod util {
pub use crate::either::Either;
pub use crate::lazy::{lazy, Lazy};
pub use crate::ready::Ready;
}

View file

@ -1,10 +1,12 @@
# Changes
## [0.3.14] - 2021-04-xx
## [0.3.14] - 2021-04-03
* server: prevent double socket registration if accept loop is in back-pressure state
* util: add custom Ready future
* util: add custom Ready, Either future and several helper functions
* reduce futures crate dependencies
## [0.3.13] - 2021-03-26

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.3.13"
version = "0.3.14"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -36,10 +36,10 @@ compress = ["flate2", "brotli2"]
cookie = ["coo-kie", "coo-kie/percent-encode"]
[dependencies]
ntex-codec = "0.4.0"
ntex-rt = "0.2.1"
ntex-codec = "0.4.1"
ntex-rt = "0.2.2"
ntex-router = "0.4.2"
ntex-service = "0.1.6"
ntex-service = "0.1.7"
ntex-macros = "0.1.3"
base64 = "0.13"
@ -47,20 +47,21 @@ bitflags = "1.2"
bytes = "1.0"
bytestring = { version = "1.0", features = ["serde"] }
derive_more = "0.99.13"
either = "1.6.1"
encoding_rs = "0.8"
futures = "0.3.13"
futures-core = { version = "0.3.13", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.13", default-features = false, features = ["alloc"] }
ahash = "0.7.2"
h2 = "0.3"
http = "0.2"
httparse = "1.3"
httpdate = "1.0"
log = "0.4"
mime = "0.3"
mio = "0.7.10"
num_cpus = "1.13"
nanorand = "0.5"
percent-encoding = "2.1"
pin-project-lite = "0.2"
rand = "0.8"
regex = "1.4"
sha-1 = "0.9"
slab = "0.4"
@ -70,8 +71,7 @@ serde_urlencoded = "0.7"
socket2 = "0.4"
url = "2.1"
coo-kie = { version = "0.15", package = "cookie", optional = true }
time = { version = "0.2", default-features = false, features = ["std"] }
tokio = { version = "1", default-features=false }
tokio = { version = "1", default-features=false, features = ["sync"] }
# resolver
trust-dns-proto = { version = "0.20", default-features = false }
@ -93,7 +93,9 @@ flate2 = { version = "1.0.20", optional = true }
[dev-dependencies]
env_logger = "0.8"
serde_derive = "1.0"
rand = "0.8"
time = "0.2"
open-ssl = { version="0.10", package = "openssl" }
rust-tls = { version = "0.19", package="rustls", features = ["dangerous_configuration"] }
webpki = "0.21"
futures = "0.3.13"

View file

@ -1,6 +1,4 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use slab::Slab;
@ -106,7 +104,7 @@ impl Drop for Waiter {
#[cfg(test)]
mod tests {
use super::*;
use futures::future::lazy;
use crate::util::lazy;
#[crate::rt_test]
#[allow(clippy::unit_cmp)]

View file

@ -5,3 +5,16 @@ pub mod condition;
pub mod mpsc;
pub mod oneshot;
pub mod pool;
/// Error returned from a [`Receiver`](Receiver) when the corresponding
/// [`Sender`](Sender) is dropped.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct Canceled;
impl std::fmt::Display for Canceled {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::write!(f, "oneshot canceled")
}
}
impl std::error::Error for Canceled {}

View file

@ -1,11 +1,8 @@
//! A multi-producer, single-consumer, futures-aware, FIFO queue.
use std::collections::VecDeque;
use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{collections::VecDeque, fmt, pin::Pin, task::Context, task::Poll};
use futures::{Sink, Stream};
use futures_core::Stream;
use futures_sink::Sink;
use super::cell::{Cell, WeakCell};
use crate::task::LocalWaker;
@ -209,7 +206,7 @@ impl<T> Drop for Receiver<T> {
/// dropped
pub struct SendError<T>(T);
impl<T> Error for SendError<T> {}
impl<T> std::error::Error for SendError<T> {}
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -233,8 +230,8 @@ impl<T> SendError<T> {
#[cfg(test)]
mod tests {
use super::*;
use futures::future::lazy;
use futures::{Sink, Stream, StreamExt};
use crate::{util::lazy, util::next, Stream};
use futures_sink::Sink;
#[crate::rt_test]
async fn test_mpsc() {
@ -243,11 +240,11 @@ mod tests {
assert!(format!("{:?}", rx).contains("Receiver"));
tx.send("test").unwrap();
assert_eq!(rx.next().await.unwrap(), "test");
assert_eq!(next(&mut rx).await.unwrap(), "test");
let tx2 = tx.clone();
tx2.send("test2").unwrap();
assert_eq!(rx.next().await.unwrap(), "test2");
assert_eq!(next(&mut rx).await.unwrap(), "test2");
assert_eq!(
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
@ -262,7 +259,7 @@ mod tests {
let (tx, mut rx) = channel::<String>();
tx.close();
assert_eq!(rx.next().await, None);
assert_eq!(next(&mut rx).await, None);
let (tx, _rx) = channel::<String>();
let weak_tx = tx.downgrade();
@ -295,8 +292,8 @@ mod tests {
assert!(Pin::new(&mut tx).poll_close(cx).is_ready());
})
.await;
assert_eq!(rx.next().await.unwrap(), "test");
assert_eq!(rx.next().await, None);
assert_eq!(next(&mut rx).await.unwrap(), "test");
assert_eq!(next(&mut rx).await, None);
}
#[crate::rt_test]

View file

@ -1,11 +1,7 @@
//! A one-shot, futures-aware channel.
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin, task::Context, task::Poll};
pub use futures::channel::oneshot::Canceled;
use super::cell::Cell;
use super::{cell::Cell, Canceled};
use crate::task::LocalWaker;
/// Creates a new futures-aware, one-shot channel.
@ -105,7 +101,7 @@ impl<T> Future for Receiver<T> {
#[cfg(test)]
mod tests {
use super::*;
use futures::future::lazy;
use crate::util::lazy;
#[crate::rt_test]
async fn test_oneshot() {

View file

@ -1,12 +1,9 @@
//! A one-shot pool, futures-aware channel.
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin, task::Context, task::Poll};
pub use futures::channel::oneshot::Canceled;
use slab::Slab;
use super::cell::Cell;
use super::{cell::Cell, Canceled};
use crate::task::LocalWaker;
/// Creates a new futures-aware, pool of one-shot's.
@ -179,7 +176,7 @@ impl<T> Future for Receiver<T> {
#[cfg(test)]
mod tests {
use super::*;
use futures::future::lazy;
use crate::util::lazy;
#[crate::rt_test]
async fn test_pool() {

View file

@ -3,7 +3,7 @@ use std::fmt;
use std::iter::{FromIterator, FusedIterator};
use std::net::SocketAddr;
use either::Either;
use crate::util::Either;
/// Connect request
pub trait Address: Unpin + 'static {

View file

@ -3,11 +3,9 @@ use std::{
task::Context, task::Poll,
};
use futures::future::Either;
use super::{default_resolver, Address, Connect, ConnectError, DnsResolver};
use crate::service::{Service, ServiceFactory};
use crate::util::Ready;
use crate::util::{Either, Ready};
/// DNS Resolver Service
pub struct Resolver<T> {
@ -42,7 +40,7 @@ impl<T: Address> Resolver<T> {
if req.addr.is_some() || req.req.addr().is_some() {
Either::Right(Ready::ok(req))
} else if let Ok(ip) = req.host().parse() {
req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port())));
req.addr = Some(Either::Left(SocketAddr::new(ip, req.port())));
Either::Right(Ready::ok(req))
} else {
trace!("DNS resolver: resolving host {:?}", req.host());
@ -138,9 +136,8 @@ impl<T: Address> Service for Resolver<T> {
#[cfg(test)]
mod tests {
use futures::future::lazy;
use super::*;
use crate::util::lazy;
#[crate::rt_test]
async fn resolver() {

View file

@ -5,11 +5,10 @@ use std::{
time::Instant,
};
use either::Either;
use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use crate::framed::{DispatchItem, Read, ReadTask, State, Timer, Write, WriteTask};
use crate::service::{IntoService, Service};
use crate::util::Either;
type Response<U> = <U as Encoder>::Item;

View file

@ -2,12 +2,11 @@
use std::task::{Context, Poll, Waker};
use std::{cell::Cell, cell::RefCell, future::Future, hash, io, mem, pin::Pin, rc::Rc};
use futures::future::poll_fn;
use slab::Slab;
use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts};
use crate::task::LocalWaker;
use crate::util::{Buf, BytesMut, Either};
use crate::util::{poll_fn, Buf, BytesMut, Either};
bitflags::bitflags! {
pub struct Flags: u16 {
@ -1110,12 +1109,9 @@ impl Drop for OnDisconnect {
#[cfg(test)]
mod tests {
use bytes::Bytes;
use futures::future::lazy;
use crate::codec::BytesCodec;
use crate::testing::Io;
use super::*;
use crate::{codec::BytesCodec, testing::Io, util::lazy};
const BIN: &[u8] = b"GET /test HTTP/1\r\n\r\n";
const TEXT: &str = "GET /test HTTP/1\r\n\r\n";

View file

@ -165,7 +165,9 @@ where
// disconnect timeout
if let Some(ref mut delay) = delay {
futures::ready!(Pin::new(delay).poll(cx));
if let Poll::Pending = Pin::new(delay).poll(cx) {
return Poll::Pending;
}
}
this.state.set_wr_shutdown_complete();
log::trace!("write task is stopped after delay");

View file

@ -3,7 +3,7 @@ use std::{
};
use bytes::{Bytes, BytesMut};
use futures::Stream;
use futures_core::Stream;
#[derive(Debug, PartialEq, Copy, Clone)]
/// Body size hint
@ -509,11 +509,11 @@ where
#[cfg(test)]
mod tests {
use futures::{future::poll_fn, stream};
use futures::stream;
use std::io;
use super::*;
use crate::util::Ready;
use crate::util::{poll_fn, Ready};
impl Body {
pub(crate) fn get_ref(&self) -> &[u8] {

View file

@ -1,7 +1,6 @@
use std::{fmt, future::Future, pin::Pin, time};
use bytes::Bytes;
use futures::future::Either;
use h2::client::SendRequest;
use crate::codec::{AsyncRead, AsyncWrite, Framed};
@ -10,7 +9,7 @@ use crate::http::h1::ClientCodec;
use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::payload::Payload;
use crate::http::Protocol;
use crate::util::Ready;
use crate::util::{Either, Ready};
use super::error::SendRequestError;
use super::pool::Acquired;

View file

@ -1,13 +1,11 @@
use std::{rc::Rc, task::Context, task::Poll, time::Duration};
use futures::future::Either;
use crate::codec::{AsyncRead, AsyncWrite};
use crate::connect::{self, Connect as TcpConnect, Connector as TcpConnector};
use crate::http::{Protocol, Uri};
use crate::service::{apply_fn, boxed, Service};
use crate::util::timeout::{TimeoutError, TimeoutService};
use crate::util::Ready;
use crate::util::{Either, Ready};
use super::connection::Connection;
use super::error::ConnectError;
@ -83,7 +81,7 @@ impl Connector {
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
let _ = ssl
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| error!("Can not set ALPN protocol: {:?}", e));
.map_err(|e| error!("Cannot set ALPN protocol: {:?}", e));
conn.openssl(ssl.build())
}
#[cfg(all(not(feature = "openssl"), feature = "rustls"))]
@ -366,7 +364,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use futures::future::lazy;
use crate::util::lazy;
#[crate::rt_test]
async fn test_readiness() {

View file

@ -2,7 +2,6 @@
use std::{error::Error, io};
use derive_more::{Display, From};
use either::Either;
use serde_json::error::Error as JsonError;
#[cfg(feature = "openssl")]
@ -12,6 +11,7 @@ use crate::connect::ResolveError;
use crate::http::error::{HttpError, ParseError, PayloadError};
use crate::http::header::HeaderValue;
use crate::http::StatusCode;
use crate::util::Either;
use crate::ws::ProtocolError;
/// Websocket client error

View file

@ -1,7 +1,7 @@
use std::{convert::TryFrom, error::Error, fmt, net, rc::Rc, time::Duration};
use bytes::Bytes;
use futures::Stream;
use futures_core::Stream;
use serde::Serialize;
use crate::http::body::Body;

View file

@ -1,7 +1,6 @@
use std::{io, io::Write, pin::Pin, task::Context, task::Poll, time};
use bytes::{BufMut, Bytes, BytesMut};
use futures::{future::poll_fn, SinkExt, Stream, StreamExt};
use crate::codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
use crate::http::body::{BodySize, MessageBody};
@ -10,6 +9,8 @@ use crate::http::h1;
use crate::http::header::{HeaderMap, HeaderValue, HOST};
use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::payload::{Payload, PayloadStream};
use crate::util::{next, poll_fn, send};
use crate::{Sink, Stream};
use super::connection::{ConnectionLifetime, ConnectionType, IoConnection};
use super::error::{ConnectError, SendRequestError};
@ -48,7 +49,7 @@ where
headers.insert(HOST, value)
}
},
Err(e) => log::error!("Can not set HOST header {}", e),
Err(e) => log::error!("Cannot set HOST header {}", e),
}
}
}
@ -61,7 +62,7 @@ where
// create Framed and send request
let mut framed = Framed::new(io, h1::ClientCodec::default());
framed.send((head, body.size()).into()).await?;
send(&mut framed, (head, body.size()).into()).await?;
// send request body
match body.size() {
@ -70,10 +71,8 @@ where
};
// read response and init read body
let res = framed.into_future().await;
let (head, framed) = if let (Some(result), framed) = res {
let item = result.map_err(SendRequestError::from)?;
(item, framed)
let head = if let Some(result) = next(&mut framed).await {
result.map_err(SendRequestError::from)?
} else {
return Err(SendRequestError::from(ConnectError::Disconnected));
};
@ -85,7 +84,7 @@ where
Ok((head, Payload::None))
}
_ => {
let pl: PayloadStream = PlStream::new(framed).boxed_local();
let pl: PayloadStream = Box::pin(PlStream::new(framed));
Ok((head, pl.into()))
}
}
@ -100,10 +99,10 @@ where
{
// create Framed and send request
let mut framed = Framed::new(io, h1::ClientCodec::default());
framed.send((head, BodySize::None).into()).await?;
send(&mut framed, (head, BodySize::None).into()).await?;
// read response
if let (Some(result), framed) = framed.into_future().await {
if let Some(result) = next(&mut framed).await {
let head = result.map_err(SendRequestError::from)?;
Ok((head, framed))
} else {
@ -150,7 +149,8 @@ where
}
}
SinkExt::flush(framed).await?;
poll_fn(|cx| Pin::new(&mut *framed).poll_flush(cx)).await?;
Ok(())
}

View file

@ -1,7 +1,6 @@
use std::{convert::TryFrom, time};
use bytes::Bytes;
use futures::future::poll_fn;
use h2::{client::SendRequest, SendStream};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
use http::{request::Request, Method, Version};
@ -11,6 +10,7 @@ use crate::http::body::{BodySize, MessageBody};
use crate::http::header::HeaderMap;
use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::payload::Payload;
use crate::util::poll_fn;
use super::connection::{ConnectionType, IoConnection};
use super::error::SendRequestError;

View file

@ -2,7 +2,6 @@ use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{cell::RefCell, collections::VecDeque, future::Future, pin::Pin, rc::Rc};
use futures::future::poll_fn;
use h2::client::{handshake, Connection, SendRequest};
use http::uri::Authority;
@ -12,7 +11,7 @@ use crate::http::Protocol;
use crate::rt::{spawn, time::sleep, time::Sleep};
use crate::service::Service;
use crate::task::LocalWaker;
use crate::util::{Bytes, HashMap};
use crate::util::{poll_fn, Bytes, HashMap};
use super::connection::{ConnectionType, IoConnection};
use super::error::ConnectError;
@ -609,18 +608,14 @@ impl<T> Drop for Acquired<T> {
#[cfg(test)]
mod tests {
use futures::future::lazy;
use std::cell::RefCell;
use std::convert::TryFrom;
use std::rc::Rc;
use std::time::Duration;
use std::{cell::RefCell, convert::TryFrom, rc::Rc, time::Duration};
use super::*;
use crate::http::client::Connection;
use crate::http::Uri;
use crate::rt::time::sleep;
use crate::service::fn_service;
use crate::testing::Io;
use crate::{
http::client::Connection, http::Uri, service::fn_service, testing::Io,
util::lazy,
};
#[crate::rt_test]
async fn test_basics() {

View file

@ -1,7 +1,7 @@
use std::{convert::TryFrom, error::Error, fmt, net, rc::Rc, time::Duration};
use bytes::Bytes;
use futures::Stream;
use futures_core::Stream;
use serde::Serialize;
#[cfg(feature = "cookie")]

View file

@ -3,7 +3,7 @@ use std::task::{Context, Poll};
use std::{fmt, future::Future, marker::PhantomData, mem, pin::Pin};
use bytes::{Bytes, BytesMut};
use futures::Stream;
use futures_core::Stream;
use serde::de::DeserializeOwned;
#[cfg(feature = "cookie")]

View file

@ -3,7 +3,7 @@ use std::{convert::TryFrom, error::Error, future::Future, net, pin::Pin, time};
use bytes::Bytes;
use derive_more::From;
use futures::Stream;
use futures_core::Stream;
use serde::Serialize;
use crate::http::body::{Body, BodyStream};
@ -82,7 +82,10 @@ impl Future for SendClientRequest {
}
}
let res = futures::ready!(Pin::new(send).poll(cx));
let res = match Pin::new(send).poll(cx) {
Poll::Ready(res) => res,
Poll::Pending => return Poll::Pending,
};
#[cfg(feature = "compress")]
let res = res.map(|mut res| {

View file

@ -61,7 +61,7 @@ impl TestResponse {
return self;
}
}
panic!("Can not create header");
panic!("Cannot create header");
}
#[cfg(feature = "cookie")]

View file

@ -3,15 +3,15 @@ use std::{convert::TryFrom, fmt, net::SocketAddr, rc::Rc, str};
#[cfg(feature = "cookie")]
use coo_kie::{Cookie, CookieJar};
use futures::future::{Either, TryFutureExt};
use nanorand::{WyRand, RNG};
use crate::codec::{AsyncRead, AsyncWrite, Framed};
use crate::framed::{DispatchItem, Dispatcher, State};
use crate::http::error::HttpError;
use crate::http::header::{self, HeaderName, HeaderValue, AUTHORIZATION};
use crate::http::{
error::HttpError, ConnectionType, Payload, RequestHead, StatusCode, Uri,
};
use crate::http::{ConnectionType, Payload, RequestHead, StatusCode, Uri};
use crate::service::{apply_fn, into_service, IntoService, Service};
use crate::util::Either;
use crate::{channel::mpsc, rt, rt::time::timeout, util::sink, util::Ready, ws};
pub use crate::ws::{CloseCode, CloseReason, Frame, Message};
@ -295,7 +295,8 @@ impl WsRequest {
// Generate a random key for the `Sec-WebSocket-Key` header.
// a base64-encoded (see Section 4 of [RFC4648]) value that,
// when decoded, is 16 bytes in length (RFC 6455)
let sec_key: [u8; 16] = rand::random();
let mut sec_key: [u8; 16] = [0; 16];
WyRand::new().fill(&mut sec_key);
let key = base64::encode(&sec_key);
self.head.headers.insert(
@ -444,7 +445,8 @@ where
if let Err(err) = self
.start(into_service(move |item| {
srv.call(Ok::<_, ws::WsError<()>>(item)).map_err(|_| ())
let fut = srv.call(Ok::<_, ws::WsError<()>>(item));
async move { fut.await.map_err(|_| ()) }
}))
.await
{

View file

@ -1,12 +1,10 @@
use std::{cell::Cell, cell::RefCell, ptr::copy_nonoverlapping, rc::Rc, time::Duration};
use bytes::BytesMut;
use time::OffsetDateTime;
use std::{cell::Cell, cell::RefCell, ptr::copy_nonoverlapping, rc::Rc, time};
use crate::framed::Timer;
use crate::http::{Request, Response};
use crate::rt::time::{sleep, sleep_until, Instant, Sleep};
use crate::service::boxed::BoxService;
use crate::util::BytesMut;
#[derive(Debug, PartialEq, Clone, Copy)]
/// Server keep-alive setting
@ -106,7 +104,7 @@ pub(super) struct DispatcherConfig<T, S, X, U> {
pub(super) service: S,
pub(super) expect: X,
pub(super) upgrade: Option<U>,
pub(super) keep_alive: Duration,
pub(super) keep_alive: time::Duration,
pub(super) client_timeout: u64,
pub(super) client_disconnect: u64,
pub(super) ka_enabled: bool,
@ -131,7 +129,7 @@ impl<T, S, X, U> DispatcherConfig<T, S, X, U> {
expect,
upgrade,
on_request,
keep_alive: Duration::from_secs(cfg.0.keep_alive),
keep_alive: time::Duration::from_secs(cfg.0.keep_alive),
client_timeout: cfg.0.client_timeout,
client_disconnect: cfg.0.client_disconnect,
ka_enabled: cfg.0.ka_enabled,
@ -207,7 +205,7 @@ impl DateServiceInner {
self.current_time.set(Instant::now());
let mut bytes = DATE_VALUE_DEFAULT;
let dt = OffsetDateTime::now_utc().format("%a, %d %b %Y %H:%M:%S GMT");
let dt = httpdate::HttpDate::from(time::SystemTime::now()).to_string();
bytes[6..35].copy_from_slice(dt.as_ref());
self.current_date.set(bytes);
}
@ -225,7 +223,7 @@ impl DateService {
// periodic date update
let s = self.clone();
crate::rt::spawn(async move {
sleep(Duration::from_millis(500)).await;
sleep(time::Duration::from_millis(500)).await;
s.0.current.set(false);
});
}

View file

@ -1,12 +1,9 @@
use std::future::Future;
use std::io::{self, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{future::Future, io, io::Write, pin::Pin, task::Context, task::Poll};
use brotli2::write::BrotliDecoder;
use bytes::Bytes;
use flate2::write::{GzDecoder, ZlibDecoder};
use futures::{ready, Stream};
use futures_core::Stream;
use super::Writer;
use crate::http::error::PayloadError;
@ -79,10 +76,11 @@ where
) -> Poll<Option<Self::Item>> {
loop {
if let Some(ref mut fut) = self.fut {
let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) {
Ok(Ok(item)) => item,
Ok(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Err(e) => return Poll::Ready(Some(Err(e.into()))),
let (chunk, decoder) = match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(Ok(item))) => item,
Poll::Ready(Ok(Err(e))) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Pending => return Poll::Pending,
};
self.decoder = Some(decoder);
self.fut.take();

View file

@ -1,14 +1,9 @@
//! Stream encoder
use std::error::Error;
use std::future::Future;
use std::io::{self, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{future::Future, io, io::Write, pin::Pin, task::Context, task::Poll};
use brotli2::write::BrotliEncoder;
use bytes::Bytes;
use flate2::write::{GzEncoder, ZlibEncoder};
use futures::ready;
use crate::http::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::http::header::{ContentEncoding, HeaderValue, CONTENT_ENCODING};
@ -88,22 +83,25 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
fn poll_next_chunk(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
) -> Poll<Option<Result<Bytes, Box<dyn std::error::Error>>>> {
loop {
if self.eof {
return Poll::Ready(None);
}
if let Some(ref mut fut) = self.fut {
let mut encoder = match ready!(Pin::new(fut).poll(cx)) {
Ok(Ok(item)) => item,
Ok(Err(e)) => return Poll::Ready(Some(Err(Box::new(e)))),
Err(_) => {
let mut encoder = match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(Ok(item))) => item,
Poll::Ready(Ok(Err(e))) => {
return Poll::Ready(Some(Err(Box::new(e))))
}
Poll::Ready(Err(_)) => {
return Poll::Ready(Some(Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Canceled",
)))));
}
Poll::Pending => return Poll::Pending,
};
let chunk = encoder.take();
self.encoder = Some(encoder);

View file

@ -1,16 +1,16 @@
//! Http related errors
use std::{fmt, io, io::Write, str::Utf8Error, string::FromUtf8Error};
use either::Either;
use http::{header, uri::InvalidUri, StatusCode};
// re-export for convinience
pub use futures::channel::oneshot::Canceled;
pub use crate::channel::Canceled;
pub use http::Error as HttpError;
use crate::http::body::Body;
use crate::http::response::Response;
use crate::rt::task::JoinError;
use crate::util::Either;
/// Error that can be converted to `Response`
pub trait ResponseError: fmt::Display + fmt::Debug {
@ -128,7 +128,7 @@ pub enum PayloadError {
)]
Incomplete(Option<io::Error>),
/// Content encoding stream corruption
#[display(fmt = "Can not decode content-encoding.")]
#[display(fmt = "Cannot decode content-encoding.")]
EncodingCorrupted,
/// A payload reached size limit.
#[display(fmt = "A payload reached size limit.")]
@ -219,8 +219,8 @@ impl std::error::Error for DispatchError {}
/// A set of error that can occure during parsing content type
#[derive(PartialEq, Debug, Display)]
pub enum ContentTypeError {
/// Can not parse content type
#[display(fmt = "Can not parse content type")]
/// Cannot parse content type
#[display(fmt = "Cannot parse content type")]
ParseError,
/// Unknown content encoding
#[display(fmt = "Unknown content encoding")]

View file

@ -734,18 +734,14 @@ mod tests {
use std::{cell::Cell, io, sync::Arc};
use bytes::{Bytes, BytesMut};
use futures::future::{err, lazy, ok, FutureExt};
use futures::StreamExt;
use rand::Rng;
use super::*;
use crate::codec::Decoder;
use crate::http::config::{DispatcherConfig, ServiceConfig};
use crate::http::h1::{ClientCodec, ExpectHandler, UpgradeHandler};
use crate::http::{body, Request, ResponseHead, StatusCode};
use crate::rt::time::sleep;
use crate::service::{boxed, fn_service, IntoService};
use crate::testing::Io;
use crate::{codec::Decoder, rt::time::sleep, testing::Io, util::lazy, util::next};
const BUFFER_SIZE: usize = 32_768;
@ -815,12 +811,14 @@ mod tests {
server,
Rc::new(DispatcherConfig::new(
ServiceConfig::default(),
fn_service(|_| ok::<_, io::Error>(Response::Ok().finish())),
fn_service(|_| {
Box::pin(async { Ok::<_, io::Error>(Response::Ok().finish()) })
}),
ExpectHandler,
None,
Some(boxed::service(crate::into_service(move |(req, _)| {
data2.set(true);
ok(req)
Box::pin(async move { Ok(req) })
}))),
)),
None,
@ -842,7 +840,9 @@ mod tests {
client.remote_buffer_cap(1024);
client.write("GET /test HTTP/1\r\n\r\n");
let mut h1 = h1(server, |_| ok::<_, io::Error>(Response::Ok().finish()));
let mut h1 = h1(server, |_| {
Box::pin(async { Ok::<_, io::Error>(Response::Ok().finish()) })
});
sleep(time::Duration::from_millis(50)).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
@ -863,7 +863,9 @@ mod tests {
let (client, server) = Io::create();
client.remote_buffer_cap(4096);
let mut decoder = ClientCodec::default();
spawn_h1(server, |_| ok::<_, io::Error>(Response::Ok().finish()));
spawn_h1(server, |_| async {
Ok::<_, io::Error>(Response::Ok().finish())
});
client.write("GET /test HTTP/1.1\r\n\r\n");
@ -891,7 +893,7 @@ mod tests {
let mut decoder = ClientCodec::default();
spawn_h1(server, |mut req: Request| async move {
let mut p = req.take_payload();
while let Some(_) = p.next().await {}
while let Some(_) = next(&mut p).await {}
Ok::<_, io::Error>(Response::Ok().finish())
});
@ -963,7 +965,7 @@ mod tests {
let (client, server) = Io::create();
spawn_h1(server, move |_| {
num2.fetch_add(1, Ordering::Relaxed);
ok::<_, io::Error>(Response::Ok().finish())
async { Ok::<_, io::Error>(Response::Ok().finish()) }
});
client.remote_buffer_cap(1024);
@ -983,7 +985,9 @@ mod tests {
let (client, server) = Io::create();
client.remote_buffer_cap(4096);
let mut h1 = h1(server, |_| ok::<_, io::Error>(Response::Ok().finish()));
let mut h1 = h1(server, |_| {
Box::pin(async { Ok::<_, io::Error>(Response::Ok().finish()) })
});
h1.inner.state.set_buffer_params(16 * 1024, 16 * 1024, 1024);
let mut decoder = ClientCodec::default();
@ -1018,7 +1022,7 @@ mod tests {
async move {
// read one chunk
let mut pl = req.take_payload();
let _ = pl.next().await.unwrap().unwrap();
let _ = next(&mut pl).await.unwrap().unwrap();
m.store(true, Ordering::Relaxed);
// sleep
sleep(time::Duration::from_secs(999_999)).await;
@ -1071,8 +1075,9 @@ mod tests {
let (client, server) = Io::create();
let mut h1 = h1(server, move |_| {
let n = num2.clone();
async move { Ok::<_, io::Error>(Response::Ok().message_body(Stream(n.clone()))) }
.boxed_local()
Box::pin(async move {
Ok::<_, io::Error>(Response::Ok().message_body(Stream(n.clone())))
})
});
let state = h1.inner.state.clone();
@ -1128,7 +1133,9 @@ mod tests {
let (client, server) = Io::create();
client.remote_buffer_cap(4096);
let mut h1 = h1(server, |_| {
ok::<_, io::Error>(Response::Ok().message_body(Stream(false)))
Box::pin(async {
Ok::<_, io::Error>(Response::Ok().message_body(Stream(false)))
})
});
client.write("GET /test HTTP/1.1\r\n\r\n");
@ -1155,7 +1162,9 @@ mod tests {
client.write("GET /test HTTP/1.1\r\ncontent-length:512\r\n\r\n");
let mut h1 = h1(server, |_| {
err::<Response<()>, _>(io::Error::new(io::ErrorKind::Other, "error"))
Box::pin(async {
Err::<Response<()>, _>(io::Error::new(io::ErrorKind::Other, "error"))
})
});
sleep(time::Duration::from_millis(50)).await;

View file

@ -1,12 +1,10 @@
//! Payload stream
use std::cell::RefCell;
use std::collections::VecDeque;
use std::pin::Pin;
use std::rc::{Rc, Weak};
use std::task::{Context, Poll};
use std::{cell::RefCell, collections::VecDeque, pin::Pin};
use bytes::Bytes;
use futures::Stream;
use futures_core::Stream;
use crate::http::error::PayloadError;
use crate::task::LocalWaker;
@ -210,7 +208,7 @@ impl Inner {
#[cfg(test)]
mod tests {
use super::*;
use futures::future::poll_fn;
use crate::util::poll_fn;
#[crate::rt_test]
async fn test_unread_data() {

View file

@ -1,11 +1,8 @@
use std::{io, marker::PhantomData, task::Context, task::Poll};
use futures::future::Ready;
use crate::framed::State;
use crate::http::h1::Codec;
use crate::http::request::Request;
use crate::{Service, ServiceFactory};
use crate::{framed::State, util::Ready, Service, ServiceFactory};
pub struct UpgradeHandler<T>(PhantomData<T>);
@ -16,7 +13,7 @@ impl<T> ServiceFactory for UpgradeHandler<T> {
type Error = io::Error;
type Service = UpgradeHandler<T>;
type InitError = io::Error;
type Future = Ready<Result<Self::Service, Self::InitError>>;
type Future = Ready<Self::Service, Self::InitError>;
#[inline]
fn new_service(&self, _: ()) -> Self::Future {
@ -28,7 +25,7 @@ impl<T> Service for UpgradeHandler<T> {
type Request = (Request, T, State, Codec);
type Response = ();
type Error = io::Error;
type Future = Ready<Result<Self::Response, Self::Error>>;
type Future = Ready<Self::Response, Self::Error>;
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {

View file

@ -3,7 +3,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures::Stream;
use futures_core::Stream;
use h2::RecvStream;
mod dispatcher;

View file

@ -1,7 +1,7 @@
use std::{fmt, mem, pin::Pin, task::Context, task::Poll};
use bytes::Bytes;
use futures::Stream;
use futures_core::Stream;
use h2::RecvStream;
use super::error::PayloadError;

View file

@ -2,7 +2,6 @@
use std::{cell::Ref, cell::RefMut, convert::TryFrom, error::Error, fmt, str};
use bytes::{Bytes, BytesMut};
use futures::Stream;
use serde::Serialize;
#[cfg(feature = "cookie")]
@ -13,7 +12,7 @@ use crate::http::error::{HttpError, ResponseError};
use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue};
use crate::http::message::{ConnectionType, Message, ResponseHead};
use crate::http::StatusCode;
use crate::util::Extensions;
use crate::{util::Extensions, Stream};
/// An HTTP Response
pub struct Response<B = Body> {

View file

@ -112,7 +112,7 @@ impl TestRequest {
return self;
}
}
panic!("Can not create header");
panic!("Cannot create header");
}
#[cfg(feature = "cookie")]
@ -242,7 +242,7 @@ pub fn server<F: StreamServiceFactory<TcpStream>>(factory: F) -> TestServer {
builder.set_verify(SslVerifyMode::NONE);
let _ = builder
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
Connector::default()
.timeout(time::Duration::from_millis(30000))
.openssl(builder.build())

View file

@ -46,6 +46,9 @@ pub mod ws;
pub use self::service::*;
pub use futures_core::stream::Stream;
pub use futures_sink::Sink;
pub mod codec {
//! Utilities for encoding and decoding frames.
pub use ntex_codec::*;

View file

@ -55,13 +55,13 @@ impl AcceptLoop {
pub(super) fn new(srv: Server) -> AcceptLoop {
// Create a poll instance
let poll = mio::Poll::new()
.map_err(|e| panic!("Can not create mio::Poll {}", e))
.map_err(|e| panic!("Cannot create mio::Poll {}", e))
.unwrap();
let (tx, rx) = sync_mpsc::channel();
let waker = Arc::new(
mio::Waker::new(poll.registry(), NOTIFY)
.map_err(|e| panic!("Can not create mio::Waker {}", e))
.map_err(|e| panic!("Cannot create mio::Waker {}", e))
.unwrap(),
);
let notify = AcceptNotify::new(waker, tx);
@ -159,7 +159,7 @@ impl Accept {
mio::Token(token + DELTA),
mio::Interest::READABLE,
) {
panic!("Can not register io: {}", err);
panic!("Cannot register io: {}", err);
}
entry.insert(ServerSocketInfo {
@ -231,7 +231,7 @@ impl Accept {
mio::Token(token + DELTA),
mio::Interest::READABLE,
) {
error!("Can not register server socket {}", err);
error!("Cannot register server socket {}", err);
} else {
info!("Resume accepting connections on {}", info.addr);
}
@ -253,7 +253,7 @@ impl Accept {
if let Err(err) =
self.poll.registry().deregister(&mut info.sock)
{
error!("Can not deregister server socket {}", err);
error!("Cannot deregister server socket {}", err);
} else {
info!("Paused accepting connections on {}", info.addr);
}
@ -266,7 +266,7 @@ impl Accept {
mio::Token(token + DELTA),
mio::Interest::READABLE,
) {
error!("Can not resume socket accept process: {}", err);
error!("Cannot resume socket accept process: {}", err);
} else {
info!(
"Accepting connections on {} has been resumed",
@ -321,7 +321,7 @@ impl Accept {
mio::Token(token + DELTA),
mio::Interest::READABLE,
) {
error!("Can not resume socket accept process: {}", err);
error!("Cannot resume socket accept process: {}", err);
} else {
info!("Accepting connections on {} has been resumed", info.addr);
}
@ -412,7 +412,7 @@ impl Accept {
error!("Error accepting connection: {}", e);
if let Err(err) = self.poll.registry().deregister(&mut info.sock)
{
error!("Can not deregister server socket {}", err);
error!("Cannot deregister server socket {}", err);
}
// sleep after error

View file

@ -1,16 +1,13 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{io, mem, net};
use std::{future::Future, io, mem, net, pin::Pin, time::Duration};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::channel::oneshot;
use futures::stream::FuturesUnordered;
use futures::{ready, Future, Stream, StreamExt};
use log::{error, info};
use socket2::{Domain, SockAddr, Socket, Type};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot;
use crate::rt::{net::TcpStream, spawn, time::sleep, System};
use crate::util::join_all;
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::config::{ConfiguredService, ServiceConfig};
@ -48,7 +45,7 @@ impl Default for ServerBuilder {
impl ServerBuilder {
/// Create new Server builder instance
pub fn new() -> ServerBuilder {
let (tx, rx) = unbounded();
let (tx, rx) = unbounded_channel();
let server = Server::new(tx);
ServerBuilder {
@ -368,15 +365,14 @@ impl ServerBuilder {
// stop workers
if !self.workers.is_empty() && graceful {
let fut = self
let futs: Vec<_> = self
.workers
.iter()
.map(move |worker| worker.1.stop(graceful))
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>();
.collect();
spawn(async move {
let _ = fut.await;
let _ = join_all(futs).await;
if let Some(tx) = completion {
let _ = tx.send(());
@ -443,11 +439,10 @@ impl Future for ServerBuilder {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(Pin::new(&mut self.cmd).poll_next(cx)) {
Some(it) => self.as_mut().get_mut().handle_cmd(it),
None => {
return Poll::Pending;
}
match Pin::new(&mut self.cmd).poll_recv(cx) {
Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it),
Poll::Ready(None) => return Poll::Pending,
Poll::Pending => return Poll::Pending,
}
}
}
@ -476,7 +471,7 @@ pub(super) fn bind_addr<S: net::ToSocketAddrs>(
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
"Cannot bind to address.",
))
}
} else {
@ -513,7 +508,6 @@ mod tests {
#[cfg(unix)]
#[crate::rt_test]
async fn test_signals() {
use futures::future::ok;
use std::sync::mpsc;
use std::{net, thread, time};
@ -525,7 +519,9 @@ mod tests {
crate::server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})
.unwrap()
.start()
});

View file

@ -155,7 +155,7 @@ impl InternalServiceFactory for ConfiguredService {
res.push((token, serv));
}
Err(_) => {
error!("Can not construct service");
error!("Cannot construct service");
return Err(());
}
}
@ -323,7 +323,7 @@ where
match fut.await {
Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService),
Err(e) => {
error!("Can not construct service: {:?}", e);
error!("Cannot construct service: {:?}", e);
Err(())
}
}

View file

@ -1,14 +1,11 @@
//! General purpose tcp server
#![allow(clippy::type_complexity)]
use std::error::Error;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::{future::Future, io, pin::Pin};
use futures::channel::mpsc::UnboundedSender;
use futures::channel::oneshot;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use crate::util::counter::Counter;
@ -74,7 +71,7 @@ thread_local! {
/// Ssl error combinded with service error.
#[derive(Debug)]
pub enum SslError<E> {
Ssl(Box<dyn Error>),
Ssl(Box<dyn std::error::Error>),
Service(E),
}
@ -111,11 +108,11 @@ impl Server {
}
fn signal(&self, sig: signals::Signal) {
let _ = self.0.unbounded_send(ServerCommand::Signal(sig));
let _ = self.0.send(ServerCommand::Signal(sig));
}
fn worker_faulted(&self, idx: usize) {
let _ = self.0.unbounded_send(ServerCommand::WorkerFaulted(idx));
let _ = self.0.send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections
@ -124,7 +121,7 @@ impl Server {
/// All opened connection remains active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Pause(tx));
let _ = self.0.send(ServerCommand::Pause(tx));
async move {
let _ = rx.await;
}
@ -133,7 +130,7 @@ impl Server {
/// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Resume(tx));
let _ = self.0.send(ServerCommand::Resume(tx));
async move {
let _ = rx.await;
}
@ -144,7 +141,7 @@ impl Server {
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Stop {
let _ = self.0.send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
@ -168,7 +165,7 @@ impl Future for Server {
if this.1.is_none() {
let (tx, rx) = oneshot::channel();
if this.0.unbounded_send(ServerCommand::Notify(tx)).is_err() {
if this.0.send(ServerCommand::Notify(tx)).is_err() {
return Poll::Ready(Ok(()));
}
this.1 = Some(rx);

View file

@ -155,10 +155,10 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
}
let io = this.io.as_mut().unwrap();
let res = futures::ready!(Pin::new(io).poll_accept(cx));
match res {
Ok(_) => Poll::Ready(Ok(this.io.take().unwrap())),
Err(e) => Poll::Ready(Err(Box::new(e))),
match Pin::new(io).poll_accept(cx) {
Poll::Ready(Ok(_)) => Poll::Ready(Ok(this.io.take().unwrap())),
Poll::Ready(Err(e)) => Poll::Ready(Err(Box::new(e))),
Poll::Pending => Poll::Pending,
}
}
}

View file

@ -144,10 +144,10 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceFut<T> {
}
}
let res = futures::ready!(Pin::new(&mut this.fut).poll(cx));
match res {
Ok(io) => Poll::Ready(Ok(io)),
Err(e) => Poll::Ready(Err(Box::new(e))),
match Pin::new(&mut this.fut).poll(cx) {
Poll::Ready(Ok(io)) => Poll::Ready(Ok(io)),
Poll::Ready(Err(e)) => Poll::Ready(Err(Box::new(e))),
Poll::Pending => Poll::Pending,
}
}
}

View file

@ -83,7 +83,7 @@ where
match req {
ServerMessage::Connect(stream) => {
let stream = FromStream::from_stream(stream).map_err(|e| {
error!("Can not convert to an async io stream: {}", e);
error!("Cannot convert to an async io stream: {}", e);
});
if let Ok(stream) = stream {

View file

@ -50,7 +50,7 @@ impl Signals {
match unix::signal(*kind) {
Ok(stream) => signals.push((*sig, stream)),
Err(e) => log::error!(
"Can not initialize stream handler for {:?} err: {}",
"Cannot initialize stream handler for {:?} err: {}",
sig,
e
),

View file

@ -1,15 +1,13 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::{pin::Pin, sync::Arc, time};
use std::{future::Future, pin::Pin, sync::Arc, time};
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
use futures::future::join_all;
use futures::{Future, Stream as FutStream};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use crate::rt::time::{sleep_until, Instant, Sleep};
use crate::rt::{spawn, Arbiter};
use crate::util::counter::Counter;
use crate::util::{counter::Counter, join_all};
use super::accept::{AcceptNotify, Command};
use super::service::{BoxedServerService, InternalServiceFactory, ServerMessage};
@ -78,9 +76,7 @@ impl WorkerClient {
}
pub(super) fn send(&self, msg: Connection) -> Result<(), Connection> {
self.tx1
.unbounded_send(WorkerCommand(msg))
.map_err(|msg| msg.into_inner().0)
self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0)
}
pub(super) fn available(&self) -> bool {
@ -89,7 +85,7 @@ impl WorkerClient {
pub(super) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (result, rx) = oneshot::channel();
let _ = self.tx2.unbounded_send(StopCommand { graceful, result });
let _ = self.tx2.send(StopCommand { graceful, result });
rx
}
}
@ -165,8 +161,8 @@ impl Worker {
availability: WorkerAvailability,
shutdown_timeout: time::Duration,
) -> WorkerClient {
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let (tx1, rx1) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let avail = availability.clone();
Arbiter::default().exec_fn(move || {
@ -178,7 +174,7 @@ impl Worker {
let _ = spawn(wrk);
}
Err(e) => {
error!("Can not start worker: {:?}", e);
error!("Cannot start worker: {:?}", e);
Arbiter::current().stop();
}
}
@ -221,8 +217,7 @@ impl Worker {
}));
}
let res = join_all(fut).await;
let res: Result<Vec<_>, _> = res.into_iter().collect();
let res: Result<Vec<_>, _> = join_all(fut).await.into_iter().collect();
match res {
Ok(services) => {
for item in services {
@ -335,7 +330,7 @@ impl Future for Worker {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// `StopWorker` message handler
if let Poll::Ready(Some(StopCommand { graceful, result })) =
Pin::new(&mut self.rx2).poll_next(cx)
Pin::new(&mut self.rx2).poll_recv(cx)
{
self.availability.set(false);
let num = num_connections();
@ -409,7 +404,7 @@ impl Future for Worker {
}
Poll::Ready(Err(_)) => {
panic!(
"Can not restart {:?} service",
"Cannot restart {:?} service",
self.factories[idx].name(token)
);
}
@ -475,7 +470,7 @@ impl Future for Worker {
}
}
match Pin::new(&mut self.rx).poll_next(cx) {
match Pin::new(&mut self.rx).poll_recv(cx) {
// handle incoming io stream
Poll::Ready(Some(WorkerCommand(msg))) => {
let guard = self.conns.get();
@ -502,14 +497,13 @@ impl Future for Worker {
#[cfg(test)]
mod tests {
use futures::future::{lazy, ok, Ready};
use futures::SinkExt;
use std::sync::{Arc, Mutex};
use super::*;
use crate::rt::net::TcpStream;
use crate::server::service::Factory;
use crate::service::{Service, ServiceFactory};
use crate::util::{lazy, Ready};
#[derive(Clone, Copy, Debug)]
enum St {
@ -531,12 +525,12 @@ mod tests {
type Service = Srv;
type Config = ();
type InitError = ();
type Future = Ready<Result<Srv, ()>>;
type Future = Ready<Srv, ()>;
fn new_service(&self, _: ()) -> Self::Future {
let mut cnt = self.counter.lock().unwrap();
*cnt += 1;
ok(Srv {
Ready::ok(Srv {
st: self.st.clone(),
})
}
@ -550,7 +544,7 @@ mod tests {
type Request = TcpStream;
type Response = ();
type Error = ();
type Future = Ready<Result<(), ()>>;
type Future = Ready<(), ()>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let st: St = { *self.st.lock().unwrap() };
@ -572,15 +566,15 @@ mod tests {
}
fn call(&self, _: TcpStream) -> Self::Future {
ok(())
Ready::ok(())
}
}
#[crate::rt_test]
#[allow(clippy::mutex_atomic)]
async fn basics() {
let (_tx1, rx1) = unbounded();
let (mut tx2, rx2) = unbounded();
let (_tx1, rx1) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let (sync_tx, _sync_rx) = std::sync::mpsc::channel();
let poll = mio::Poll::new().unwrap();
let waker = Arc::new(mio::Waker::new(poll.registry(), mio::Token(1)).unwrap());
@ -650,7 +644,6 @@ mod tests {
graceful: true,
result: tx,
})
.await
.unwrap();
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
@ -660,8 +653,8 @@ mod tests {
let _ = rx.await;
// force shutdown
let (_tx1, rx1) = unbounded();
let (mut tx2, rx2) = unbounded();
let (_tx1, rx1) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let avail = WorkerAvailability::new(AcceptNotify::new(waker, sync_tx.clone()));
let f = SrvFactory {
st: st.clone(),
@ -695,7 +688,6 @@ mod tests {
graceful: false,
result: tx,
})
.await
.unwrap();
assert!(lazy(|cx| Pin::new(&mut worker).poll(cx)).await.is_ready());

View file

@ -1,15 +1,30 @@
use std::cell::{Cell, RefCell};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::{cmp, io, mem, time};
use std::task::{Context, Poll, Waker};
use std::{cmp, fmt, io, mem, pin::Pin, time};
use bytes::BytesMut;
use futures::future::poll_fn;
use futures::task::AtomicWaker;
use crate::codec::{AsyncRead, AsyncWrite, ReadBuf};
use crate::rt::time::sleep;
use crate::util::poll_fn;
#[derive(Default)]
struct AtomicWaker(Arc<Mutex<RefCell<Option<Waker>>>>);
impl AtomicWaker {
fn wake(&self) {
if let Some(waker) = self.0.lock().unwrap().borrow_mut().take() {
waker.wake()
}
}
}
impl fmt::Debug for AtomicWaker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "AtomicWaker")
}
}
/// Async io stream
#[derive(Debug)]
@ -196,7 +211,8 @@ impl Io {
if closed {
Poll::Ready(())
} else {
read.waker.register(cx.waker());
*read.waker.0.lock().unwrap().borrow_mut() =
Some(cx.waker().clone());
drop(read);
drop(guard);
Poll::Pending
@ -248,7 +264,7 @@ impl AsyncRead for Io {
) -> Poll<io::Result<()>> {
let guard = self.local.lock().unwrap();
let mut ch = guard.borrow_mut();
ch.waker.register(cx.waker());
*ch.waker.0.lock().unwrap().borrow_mut() = Some(cx.waker().clone());
if !ch.buf.is_empty() {
let size = std::cmp::min(ch.buf.len(), buf.remaining());
@ -288,23 +304,31 @@ impl AsyncWrite for Io {
ch.waker.wake();
Poll::Ready(Ok(cap))
} else {
self.local
*self
.local
.lock()
.unwrap()
.borrow_mut()
.waker
.register(cx.waker());
.0
.lock()
.unwrap()
.borrow_mut() = Some(cx.waker().clone());
Poll::Pending
}
}
IoState::Close => Poll::Ready(Ok(0)),
IoState::Pending => {
self.local
*self
.local
.lock()
.unwrap()
.borrow_mut()
.waker
.register(cx.waker());
.0
.lock()
.unwrap()
.borrow_mut() = Some(cx.waker().clone());
Poll::Pending
}
IoState::Err(e) => Poll::Ready(Err(e)),

View file

@ -5,13 +5,10 @@ use std::{
collections::VecDeque, convert::Infallible, future::Future, pin::Pin, rc::Rc,
};
use futures::future::Either;
use futures::ready;
use crate::channel::oneshot;
use crate::service::{IntoService, Service, Transform};
use crate::task::LocalWaker;
use crate::util::Ready;
use crate::util::{Either, Ready};
/// Buffer - service factory for service that can buffer incoming request.
///
@ -223,7 +220,10 @@ impl<S: Service<Error = E>, E> Future for BufferServiceResponse<S, E> {
Poll::Pending => return Poll::Pending,
},
StateProject::Srv { fut, inner } => {
let res = ready!(fut.poll(cx));
let res = match fut.poll(cx) {
Poll::Ready(res) => res,
Poll::Pending => return Poll::Pending,
};
inner.waker.wake();
return Poll::Ready(res);
}
@ -238,7 +238,7 @@ mod tests {
use super::*;
use crate::service::{apply, fn_factory, Service, ServiceFactory};
use futures::future::lazy;
use crate::util::lazy;
#[derive(Clone)]
struct TestService(Rc<Inner>);

View file

@ -119,7 +119,7 @@ mod tests {
use super::*;
use crate::service::{apply, fn_factory, Service, ServiceFactory};
use futures::future::{lazy, ok};
use crate::util::lazy;
struct SleepService(Duration);
@ -162,7 +162,10 @@ mod tests {
async fn test_newtransform() {
let wait_time = Duration::from_millis(50);
let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time))));
let srv = apply(
InFlight::new(1),
fn_factory(|| async { Ok(SleepService(wait_time)) }),
);
let srv = srv.new_service(&()).await.unwrap();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

View file

@ -139,11 +139,10 @@ where
#[cfg(test)]
mod tests {
use futures::future::lazy;
use super::*;
use crate::rt::time::sleep;
use crate::service::{Service, ServiceFactory};
use crate::util::lazy;
#[derive(Debug, PartialEq)]
struct TestErr;

View file

@ -1,3 +1,5 @@
use std::{future::Future, pin::Pin, task::Context, task::Poll};
pub mod buffer;
pub mod counter;
mod extensions;
@ -11,11 +13,162 @@ pub mod variant;
pub use self::extensions::Extensions;
pub use ntex_service::util::{lazy, Lazy, Ready};
pub use ntex_service::util::{lazy, Either, Lazy, Ready};
pub use bytes::{Buf, BufMut, Bytes, BytesMut};
pub use bytestring::ByteString;
pub use either::Either;
pub type HashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;
pub type HashSet<V> = std::collections::HashSet<V, ahash::RandomState>;
/// Creates a new future wrapping around a function returning [`Poll`].
///
/// Polling the returned future delegates to the wrapped function.
pub fn poll_fn<T, F>(f: F) -> impl Future<Output = T>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
PollFn { f }
}
/// Future for the [`poll_fn`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct PollFn<F> {
f: F,
}
impl<F> Unpin for PollFn<F> {}
impl<T, F> Future for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
(&mut self.f)(cx)
}
}
/// Creates a future that resolves to the next item in the stream.
pub async fn next<S>(stream: &mut S) -> Option<S::Item>
where
S: crate::Stream + Unpin,
{
poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await
}
/// A future that completes after the given item has been fully processed
/// into the sink, including flushing.
pub async fn send<S, I>(sink: &mut S, item: I) -> Result<(), S::Error>
where
S: crate::Sink<I> + Unpin,
{
poll_fn(|cx| Pin::new(&mut *sink).poll_ready(cx)).await?;
Pin::new(&mut *sink).start_send(item)?;
poll_fn(|cx| Pin::new(&mut *sink).poll_flush(cx)).await
}
/// Future for the `join` combinator, waiting for two futures to
/// complete.
pub async fn join<A, B>(fut_a: A, fut_b: B) -> (A::Output, B::Output)
where
A: Future,
B: Future,
{
tokio::pin!(fut_a);
tokio::pin!(fut_b);
let mut res_a = None;
let mut res_b = None;
poll_fn(|cx| {
if res_a.is_none() {
if let Poll::Ready(item) = Pin::new(&mut fut_a).poll(cx) {
res_a = Some(item)
}
}
if res_b.is_none() {
if let Poll::Ready(item) = Pin::new(&mut fut_b).poll(cx) {
res_b = Some(item)
}
}
if res_a.is_some() && res_b.is_some() {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await;
(res_a.unwrap(), res_b.unwrap())
}
/// Waits for either one of two differently-typed futures to complete.
pub async fn select<A, B>(fut_a: A, fut_b: B) -> Either<A::Output, B::Output>
where
A: Future,
B: Future,
{
tokio::pin!(fut_a);
tokio::pin!(fut_b);
poll_fn(|cx| {
if let Poll::Ready(item) = Pin::new(&mut fut_a).poll(cx) {
Poll::Ready(Either::Left(item))
} else if let Poll::Ready(item) = Pin::new(&mut fut_b).poll(cx) {
Poll::Ready(Either::Right(item))
} else {
Poll::Pending
}
})
.await
}
enum MaybeDone<T>
where
T: Future,
{
Pending(T),
Done(T::Output),
}
/// Creates a future which represents a collection of the outputs of the futures given.
pub async fn join_all<F, T>(fut: Vec<F>) -> Vec<T>
where
F: Future<Output = T>,
{
let mut futs: Vec<_> = fut
.into_iter()
.map(|f| MaybeDone::Pending(Box::pin(f)))
.collect();
poll_fn(|cx| {
let mut pending = false;
for item in &mut futs {
if let MaybeDone::Pending(ref mut fut) = item {
if let Poll::Ready(res) = fut.as_mut().poll(cx) {
*item = MaybeDone::Done(res);
} else {
pending = true;
}
}
}
if pending {
Poll::Pending
} else {
Poll::Ready(())
}
})
.await;
futs.into_iter()
.map(|item| {
if let MaybeDone::Done(item) = item {
item
} else {
unreachable!()
}
})
.collect()
}

View file

@ -2,10 +2,9 @@ use std::{
cell::Cell, cell::RefCell, marker::PhantomData, pin::Pin, task::Context, task::Poll,
};
use futures::future::{ready, Ready};
use futures::Sink;
use futures_sink::Sink;
use crate::service::Service;
use crate::{service::Service, util::Ready};
/// `SinkService` forwards incoming requests to the provided `Sink`
pub struct SinkService<S, I> {
@ -48,7 +47,7 @@ where
type Request = I;
type Response = ();
type Error = S::Error;
type Future = Ready<Result<(), S::Error>>;
type Future = Ready<(), S::Error>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.sink.borrow_mut();
@ -78,6 +77,6 @@ where
}
fn call(&self, req: I) -> Self::Future {
ready(Pin::new(&mut *self.sink.borrow_mut()).start_send(req))
Ready::result(Pin::new(&mut *self.sink.borrow_mut()).start_send(req))
}
}

View file

@ -1,9 +1,8 @@
use std::{fmt, pin::Pin, task::Context, task::Poll};
use futures::{ready, Future, FutureExt, Sink, SinkExt, Stream};
use std::{fmt, future::Future, pin::Pin, task::Context, task::Poll};
use crate::channel::mpsc;
use crate::service::{IntoService, Service};
use crate::{util::poll_fn, Sink, Stream};
pin_project_lite::pin_project! {
pub struct Dispatcher<R, S, T, U>
@ -65,12 +64,17 @@ where
if let Some(is_err) = this.shutdown {
if let Some(mut sink) = this.sink.take() {
crate::rt::spawn(async move {
if sink.flush().await.is_ok() {
let _ = sink.close().await;
if poll_fn(|cx| Pin::new(&mut sink).poll_flush(cx))
.await
.is_ok()
{
let _ = poll_fn(|cx| Pin::new(&mut sink).poll_close(cx)).await;
}
});
}
ready!(this.service.poll_shutdown(cx, *is_err));
if let Poll::Pending = this.service.poll_shutdown(cx, *is_err) {
return Poll::Pending;
}
return Poll::Ready(());
}
@ -126,9 +130,11 @@ where
Poll::Ready(Ok(_)) => match Pin::new(&mut this.stream).poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
let tx = this.rx.sender();
crate::rt::spawn(this.service.call(item).map(move |res| {
let fut = this.service.call(item);
crate::rt::spawn(async move {
let res = fut.await;
let _ = tx.send(res);
}));
});
this = self.as_mut().project();
continue;
}
@ -157,14 +163,13 @@ where
mod tests {
use bytes::BytesMut;
use bytestring::ByteString;
use futures::{future::ok, StreamExt};
use std::{cell::Cell, rc::Rc, time::Duration};
use super::*;
use crate::channel::mpsc;
use crate::codec::Encoder;
use crate::rt::time::sleep;
use crate::ws;
use crate::{util::next, ws};
#[crate::rt_test]
async fn test_basic() {
@ -181,10 +186,12 @@ mod tests {
encoder,
crate::fn_service(move |_| {
counter2.set(counter2.get() + 1);
ok(Some(ws::Message::Text(ByteString::from_static("test"))))
async { Ok(Some(ws::Message::Text(ByteString::from_static("test")))) }
}),
);
crate::rt::spawn(disp.map(|_| ()));
crate::rt::spawn(async move {
let _ = disp.await;
});
let mut buf = BytesMut::new();
let codec = ws::Codec::new().client_mode();
@ -193,12 +200,12 @@ mod tests {
.unwrap();
tx.send(Ok::<_, ()>(buf.split().freeze())).unwrap();
let data = rx.next().await.unwrap().unwrap();
let data = next(&mut rx).await.unwrap().unwrap();
assert_eq!(data, b"\x81\x04test".as_ref());
drop(tx);
sleep(Duration::from_millis(10)).await;
assert!(rx.next().await.is_none());
assert!(next(&mut rx).await.is_none());
assert_eq!(counter.get(), 1);
}

View file

@ -157,7 +157,7 @@ impl SystemTimeService {
#[cfg(test)]
mod tests {
use super::*;
use futures::future::lazy;
use crate::util::lazy;
use std::time::{Duration, SystemTime};
#[crate::rt_test]

View file

@ -2,15 +2,11 @@
//!
//! If the response does not complete within the specified timeout, the response
//! will be aborted.
use std::{
fmt, future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll, time,
};
use futures::future::Either;
use std::{fmt, future::Future, marker, pin::Pin, task::Context, task::Poll, time};
use crate::rt::time::{sleep, Sleep};
use crate::service::{IntoService, Service, Transform};
use crate::util::Ready;
use crate::util::{Either, Ready};
const ZERO: time::Duration = time::Duration::from_millis(0);
@ -20,7 +16,7 @@ const ZERO: time::Duration = time::Duration::from_millis(0);
#[derive(Debug)]
pub struct Timeout<E = ()> {
timeout: time::Duration,
_t: PhantomData<E>,
_t: marker::PhantomData<E>,
}
/// Timeout error
@ -74,7 +70,7 @@ impl<E> Timeout<E> {
pub fn new(timeout: time::Duration) -> Self {
Timeout {
timeout,
_t: PhantomData,
_t: marker::PhantomData,
}
}
}
@ -222,12 +218,12 @@ where
#[cfg(test)]
mod tests {
use derive_more::Display;
use futures::future::{lazy, ok};
use std::task::{Context, Poll};
use std::time::Duration;
use super::*;
use crate::service::{apply, fn_factory, Service, ServiceFactory};
use crate::util::lazy;
#[derive(Clone, Debug, PartialEq)]
struct SleepService(Duration);
@ -303,7 +299,7 @@ mod tests {
let timeout = apply(
Timeout::new(resolution).clone(),
fn_factory(|| ok::<_, ()>(SleepService(wait_time))),
fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }),
);
let srv = timeout.new_service(&()).await.unwrap();

View file

@ -319,11 +319,11 @@ variant_impl_and!(VariantFactory7, VariantFactory8, V8, v8, (V2, V3, V4, V5, V6,
#[cfg(test)]
mod tests {
use futures::future::{lazy, ok, Ready};
use std::task::{Context, Poll};
use super::*;
use crate::service::{fn_factory, Service, ServiceFactory};
use crate::util::{lazy, Ready};
#[derive(Clone)]
struct Srv1;
@ -332,7 +332,7 @@ mod tests {
type Request = ();
type Response = usize;
type Error = ();
type Future = Ready<Result<usize, ()>>;
type Future = Ready<usize, ()>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -343,7 +343,7 @@ mod tests {
}
fn call(&self, _: ()) -> Self::Future {
ok::<_, ()>(1)
Ready::<_, ()>::ok(1)
}
}
@ -354,7 +354,7 @@ mod tests {
type Request = ();
type Response = usize;
type Error = ();
type Future = Ready<Result<usize, ()>>;
type Future = Ready<usize, ()>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -365,15 +365,15 @@ mod tests {
}
fn call(&self, _: ()) -> Self::Future {
ok::<_, ()>(2)
Ready::<_, ()>::ok(2)
}
}
#[crate::rt_test]
async fn test_variant() {
let factory = variant(fn_factory(|| ok::<_, ()>(Srv1)))
.and(fn_factory(|| ok::<_, ()>(Srv2)))
.and(fn_factory(|| ok::<_, ()>(Srv2)))
let factory = variant(fn_factory(|| async { Ok::<_, ()>(Srv1) }))
.and(fn_factory(|| async { Ok::<_, ()>(Srv2) }))
.and(fn_factory(|| async { Ok::<_, ()>(Srv2) }))
.clone();
let service = factory.new_service(&()).await.clone().unwrap();

View file

@ -1,13 +1,11 @@
use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc};
use futures::future::Either;
use crate::http::Request;
use crate::router::ResourceDef;
use crate::service::boxed::{self, BoxServiceFactory};
use crate::service::{apply, apply_fn_factory, pipeline_factory};
use crate::service::{IntoServiceFactory, Service, ServiceFactory, Transform};
use crate::util::{Extensions, Ready};
use crate::util::{Either, Extensions, Ready};
use super::app_service::{AppEntry, AppFactory, AppRoutingFactory};
use super::config::{AppConfig, ServiceConfig};
@ -139,7 +137,7 @@ where
Box::pin(async move {
match fut.await {
Err(e) => {
log::error!("Can not construct data instance: {:?}", e);
log::error!("Cannot construct data instance: {:?}", e);
Err(())
}
Ok(data) => {
@ -293,7 +291,7 @@ where
{
// create and configure default resource
self.default = Some(Rc::new(boxed::factory(f.into_factory().map_init_err(
|e| log::error!("Can not construct default service: {:?}", e),
|e| log::error!("Cannot construct default service: {:?}", e),
))));
self
@ -628,7 +626,6 @@ where
#[cfg(test)]
mod tests {
use bytes::Bytes;
use futures::future::ok;
use super::*;
use crate::http::header::{self, HeaderValue};
@ -659,13 +656,13 @@ mod tests {
.service(web::resource("/test").to(|| async { HttpResponse::Ok() }))
.service(
web::resource("/test2")
.default_service(|r: WebRequest<DefaultError>| {
ok(r.into_response(HttpResponse::Created()))
.default_service(|r: WebRequest<DefaultError>| async move {
Ok(r.into_response(HttpResponse::Created()))
})
.route(web::get().to(|| async { HttpResponse::Ok() })),
)
.default_service(|r: WebRequest<DefaultError>| {
ok(r.into_response(HttpResponse::MethodNotAllowed()))
.default_service(|r: WebRequest<DefaultError>| async move {
Ok(r.into_response(HttpResponse::MethodNotAllowed()))
}),
)
.await;
@ -688,10 +685,12 @@ mod tests {
#[crate::rt_test]
async fn test_data_factory() {
let srv = init_service(
App::new().data_factory(|| ok::<_, ()>(10usize)).service(
web::resource("/")
.to(|_: web::types::Data<usize>| async { HttpResponse::Ok() }),
),
App::new()
.data_factory(|| async { Ok::<_, ()>(10usize) })
.service(
web::resource("/")
.to(|_: web::types::Data<usize>| async { HttpResponse::Ok() }),
),
)
.await;
let req = TestRequest::default().to_request();
@ -699,10 +698,12 @@ mod tests {
assert_eq!(resp.status(), StatusCode::OK);
let srv = init_service(
App::new().data_factory(|| ok::<_, ()>(10u32)).service(
web::resource("/")
.to(|_: web::types::Data<usize>| async { HttpResponse::Ok() }),
),
App::new()
.data_factory(|| async { Ok::<_, ()>(10u32) })
.service(
web::resource("/")
.to(|_: web::types::Data<usize>| async { HttpResponse::Ok() }),
),
)
.await;
let req = TestRequest::default().to_request();

View file

@ -1,13 +1,9 @@
//! Web error
use std::cell::RefCell;
use std::fmt;
use std::io::Write;
use std::marker::PhantomData;
use std::{cell::RefCell, fmt, io::Write, marker::PhantomData};
use bytes::BytesMut;
use derive_more::{Display, From};
pub use futures::channel::oneshot::Canceled;
pub use http::Error as HttpError;
pub use serde_json::error::Error as JsonError;
pub use url::ParseError as UrlParseError;
@ -16,6 +12,7 @@ use super::{HttpRequest, HttpResponse};
use crate::http::body::Body;
use crate::http::helpers::Writer;
use crate::http::{error, header, StatusCode};
use crate::util::Either;
pub use super::error_default::{DefaultError, Error};
pub use crate::http::error::BlockingError;
@ -59,7 +56,7 @@ where
impl<Err: ErrorRenderer> WebResponseError<Err> for std::convert::Infallible {}
impl<A, B, Err> WebResponseError<Err> for either::Either<A, B>
impl<A, B, Err> WebResponseError<Err> for Either<A, B>
where
A: WebResponseError<Err>,
B: WebResponseError<Err>,
@ -67,15 +64,15 @@ where
{
fn status_code(&self) -> StatusCode {
match self {
either::Either::Left(ref a) => a.status_code(),
either::Either::Right(ref b) => b.status_code(),
Either::Left(ref a) => a.status_code(),
Either::Right(ref b) => b.status_code(),
}
}
fn error_response(&self, req: &HttpRequest) -> HttpResponse {
match self {
either::Either::Left(ref a) => a.error_response(req),
either::Either::Right(ref b) => b.error_response(req),
Either::Left(ref a) => a.error_response(req),
Either::Right(ref b) => b.error_response(req),
}
}
}
@ -104,8 +101,8 @@ pub enum UrlGenerationError {
/// A set of errors that can occur during parsing urlencoded payloads
#[derive(Debug, Display, From)]
pub enum UrlencodedError {
/// Can not decode chunked transfer encoding
#[display(fmt = "Can not decode chunked transfer encoding")]
/// Cannot decode chunked transfer encoding
#[display(fmt = "Cannot decode chunked transfer encoding")]
Chunked,
/// Payload size is bigger than allowed. (default: 256kB)
#[display(
@ -170,7 +167,7 @@ pub enum PayloadError {
Payload(error::PayloadError),
#[display(fmt = "{}", _0)]
ContentType(error::ContentTypeError),
#[display(fmt = "Can not decode body")]
#[display(fmt = "Cannot decode body")]
Decoding,
}
@ -760,15 +757,15 @@ mod tests {
fn test_either_error() {
let req = TestRequest::default().to_http_request();
let err: either::Either<SendRequestError, PayloadError> =
either::Either::Left(SendRequestError::TunnelNotSupported);
let err: Either<SendRequestError, PayloadError> =
Either::Left(SendRequestError::TunnelNotSupported);
let code = WebResponseError::<DefaultError>::status_code(&err);
assert_eq!(code, StatusCode::INTERNAL_SERVER_ERROR);
let resp = WebResponseError::<DefaultError>::error_response(&err, &req);
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
let err: either::Either<SendRequestError, PayloadError> =
either::Either::Right(PayloadError::Decoding);
let err: Either<SendRequestError, PayloadError> =
Either::Right(PayloadError::Decoding);
let code = WebResponseError::<DefaultError>::status_code(&err);
assert_eq!(code, StatusCode::BAD_REQUEST);
let resp = WebResponseError::<DefaultError>::error_response(&err, &req);

View file

@ -35,10 +35,9 @@ pub trait FromRequest<Err>: Sized {
/// ```rust
/// use ntex::{http, util::Ready};
/// use ntex::web::{self, error, App, HttpRequest, FromRequest, DefaultError};
/// use serde_derive::Deserialize;
/// use rand;
///
/// #[derive(Debug, Deserialize)]
/// #[derive(Debug, serde::Deserialize)]
/// struct Thing {
/// name: String
/// }
@ -106,10 +105,9 @@ where
/// ```rust
/// use ntex::{http, util::Ready};
/// use ntex::web::{self, error, App, HttpRequest, FromRequest};
/// use serde_derive::Deserialize;
/// use rand;
///
/// #[derive(Debug, Deserialize)]
/// #[derive(Debug, serde::Deserialize)]
/// struct Thing {
/// name: String
/// }
@ -255,14 +253,13 @@ tuple_from_req!(TupleFromRequest10, (0, A), (1, B), (2, C), (3, D), (4, E), (5,
#[cfg(test)]
mod tests {
use bytes::Bytes;
use serde_derive::Deserialize;
use crate::http::header;
use crate::web::error::UrlencodedError;
use crate::web::test::{from_request, TestRequest};
use crate::web::types::{Form, FormConfig};
#[derive(Deserialize, Debug, PartialEq)]
#[derive(serde::Deserialize, Debug, PartialEq)]
struct Info {
hello: String,
}

View file

@ -259,9 +259,8 @@ impl Drop for HttpRequest {
///
/// ```rust
/// use ntex::web::{self, App, HttpRequest};
/// use serde_derive::Deserialize;
///
/// /// extract `Thing` from request
/// /// extract `HttpRequest` from request
/// async fn index(req: HttpRequest) -> String {
/// format!("Got thing: {:?}", req)
/// }

View file

@ -134,8 +134,8 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match futures::ready!(this.fut.poll(cx)) {
Ok(resp) => {
match this.fut.poll(cx)? {
Poll::Ready(resp) => {
let enc = if let Some(enc) = resp.response().get_encoding() {
enc
} else {
@ -146,7 +146,7 @@ where
resp.map_body(move |head, body| Encoder::response(enc, head, body))
))
}
Err(e) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
}

View file

@ -71,9 +71,9 @@ impl DefaultHeaders {
.headers
.append(key, value);
}
Err(_) => panic!("Can not create header value"),
Err(_) => panic!("Cannot create header value"),
},
Err(_) => panic!("Can not create header name"),
Err(_) => panic!("Cannot create header name"),
}
self
}
@ -161,11 +161,10 @@ where
#[cfg(test)]
mod tests {
use futures::future::lazy;
use super::*;
use crate::http::header::CONTENT_TYPE;
use crate::service::IntoService;
use crate::util::lazy;
use crate::web::request::WebRequest;
use crate::web::test::{ok_service, TestRequest};
use crate::web::{DefaultError, Error, HttpResponse};

View file

@ -1,17 +1,15 @@
//! Request logging middleware
use std::fmt::{self, Display};
use std::task::{Context, Poll};
use std::{convert::TryFrom, env, error::Error, future::Future, pin::Pin, rc::Rc};
use std::{convert::TryFrom, env, error::Error, future::Future, pin::Pin, rc::Rc, time};
use bytes::Bytes;
use futures::future::Either;
use regex::Regex;
use time::OffsetDateTime;
use crate::http::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::http::header::HeaderName;
use crate::service::{Service, Transform};
use crate::util::{HashSet, Ready};
use crate::util::{Either, HashSet, Ready};
use crate::web::dev::{WebRequest, WebResponse};
use crate::web::HttpResponse;
@ -166,7 +164,7 @@ where
if self.inner.exclude.contains(req.path()) {
Either::Right(self.service.call(req))
} else {
let time = OffsetDateTime::now_utc();
let time = time::SystemTime::now();
let mut format = self.inner.format.clone();
for unit in &mut format.0 {
@ -187,7 +185,7 @@ pin_project_lite::pin_project! {
{
#[pin]
fut: S::Future,
time: OffsetDateTime,
time: time::SystemTime,
format: Option<Format>,
}
}
@ -201,9 +199,10 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = match futures::ready!(this.fut.poll(cx)) {
Ok(res) => res,
Err(e) => return Poll::Ready(Err(e)),
let res = match this.fut.poll(cx) {
Poll::Ready(Ok(res)) => res,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
};
if let Some(ref mut format) = this.format {
@ -230,7 +229,7 @@ struct StreamLog {
body: ResponseBody<Body>,
format: Option<Format>,
size: usize,
time: OffsetDateTime,
time: time::SystemTime,
}
impl Drop for StreamLog {
@ -357,20 +356,20 @@ impl FormatText {
&self,
fmt: &mut fmt::Formatter<'_>,
size: usize,
entry_time: OffsetDateTime,
entry_time: time::SystemTime,
) -> Result<(), fmt::Error> {
match *self {
FormatText::Str(ref string) => fmt.write_str(string),
FormatText::Percent => "%".fmt(fmt),
FormatText::ResponseSize => size.fmt(fmt),
FormatText::Time => {
let rt = OffsetDateTime::now_utc() - entry_time;
let rt = rt.as_seconds_f64();
let rt = entry_time.elapsed().unwrap();
let rt = rt.as_secs_f64();
fmt.write_fmt(format_args!("{:.6}", rt))
}
FormatText::TimeMillis => {
let rt = OffsetDateTime::now_utc() - entry_time;
let rt = (rt.whole_nanoseconds() as f64) / 1_000_000.0;
let rt = entry_time.elapsed().unwrap();
let rt = (rt.as_nanos() as f64) / 1_000_000.0;
fmt.write_fmt(format_args!("{:.6}", rt))
}
FormatText::EnvironHeader(ref name) => {
@ -405,7 +404,7 @@ impl FormatText {
}
}
fn render_request<E>(&mut self, now: OffsetDateTime, req: &WebRequest<E>) {
fn render_request<E>(&mut self, now: time::SystemTime, req: &WebRequest<E>) {
match *self {
FormatText::RequestLine => {
*self = if req.query_string().is_empty() {
@ -427,7 +426,7 @@ impl FormatText {
}
FormatText::UrlPath => *self = FormatText::Str(req.path().to_string()),
FormatText::RequestTime => {
*self = FormatText::Str(now.format("%Y-%m-%dT%H:%M:%S"))
*self = FormatText::Str(httpdate::HttpDate::from(now).to_string())
}
FormatText::RequestHeader(ref name) => {
let s = if let Some(val) = req.headers().get(name) {
@ -466,18 +465,17 @@ impl<'a> fmt::Display for FormatDisplay<'a> {
#[cfg(test)]
mod tests {
use futures::future::{lazy, ok};
use super::*;
use crate::http::{header, StatusCode};
use crate::service::{IntoService, Service, Transform};
use crate::util::lazy;
use crate::web::test::{self, TestRequest};
use crate::web::{DefaultError, Error};
#[crate::rt_test]
async fn test_logger() {
let srv = |req: WebRequest<DefaultError>| {
ok::<_, Error>(
let srv = |req: WebRequest<DefaultError>| async move {
Ok::<_, Error>(
req.into_response(
HttpResponse::build(StatusCode::OK)
.header("X-Test", "ttt")
@ -523,7 +521,7 @@ mod tests {
.uri("/test/route/yeah?q=test")
.to_srv_request();
let now = OffsetDateTime::now_utc();
let now = time::SystemTime::now();
for unit in &mut format.0 {
unit.render_request(now, &req);
}
@ -553,7 +551,7 @@ mod tests {
)
.to_srv_request();
let now = OffsetDateTime::now_utc();
let now = time::SystemTime::now();
for unit in &mut format.0 {
unit.render_request(now, &req);
}
@ -563,7 +561,7 @@ mod tests {
unit.render_response(&resp);
}
let entry_time = OffsetDateTime::now_utc();
let entry_time = time::SystemTime::now();
let render = |fmt: &mut fmt::Formatter<'_>| {
for unit in &format.0 {
unit.render(fmt, 1024, entry_time)?;
@ -581,7 +579,7 @@ mod tests {
let mut format = Format::new("%t");
let req = TestRequest::default().to_srv_request();
let now = OffsetDateTime::now_utc();
let now = time::SystemTime::now();
for unit in &mut format.0 {
unit.render_request(now, &req);
}
@ -598,6 +596,6 @@ mod tests {
Ok(())
};
let s = format!("{}", FormatDisplay(&render));
assert!(s.contains(&now.format("%Y-%m-%dT%H:%M:%S")));
assert!(s.contains(&httpdate::HttpDate::from(now).to_string()));
}
}

View file

@ -100,7 +100,6 @@ pub use ntex_macros::web_trace as trace;
pub use crate::http::Response as HttpResponse;
pub use crate::http::ResponseBuilder as HttpResponseBuilder;
pub use either::Either;
pub use self::app::App;
pub use self::config::ServiceConfig;

View file

@ -2,14 +2,12 @@ use std::{
cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll,
};
use futures::future::Either;
use crate::http::Response;
use crate::router::{IntoPattern, ResourceDef};
use crate::service::boxed::{self, BoxService, BoxServiceFactory};
use crate::service::{apply, apply_fn_factory, pipeline_factory};
use crate::service::{IntoServiceFactory, Service, ServiceFactory, Transform};
use crate::util::{Extensions, Ready};
use crate::util::{Either, Extensions, Ready};
use super::dev::{insert_slesh, WebServiceConfig, WebServiceFactory};
use super::error::ErrorRenderer;
@ -412,7 +410,7 @@ where
// create and configure default resource
self.default = Rc::new(RefCell::new(Some(Rc::new(boxed::factory(
f.into_factory().map_init_err(|e| {
log::error!("Can not construct default service: {:?}", e)
log::error!("Cannot construct default service: {:?}", e)
}),
)))));
@ -580,8 +578,6 @@ impl<Err: ErrorRenderer> ServiceFactory for ResourceEndpoint<Err> {
mod tests {
use std::time::Duration;
use futures::future::Either;
use crate::http::header::{self, HeaderValue};
use crate::http::{Method, StatusCode};
use crate::rt::time::sleep;
@ -589,7 +585,7 @@ mod tests {
use crate::web::request::WebRequest;
use crate::web::test::{call_service, init_service, TestRequest};
use crate::web::{self, guard, App, DefaultError, HttpResponse};
use crate::{fn_service, Service};
use crate::{fn_service, util::Either, Service};
#[crate::rt_test]
async fn test_filter() {

View file

@ -1,22 +1,37 @@
use std::convert::TryFrom;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{convert::TryFrom, future::Future, marker::PhantomData, pin::Pin};
use bytes::{Bytes, BytesMut};
use futures::future::{ready, Either as EitherFuture, Ready};
use futures::ready;
use crate::http::error::HttpError;
use crate::http::header::{HeaderMap, HeaderName, HeaderValue};
use crate::http::{Response, ResponseBuilder, StatusCode};
use crate::util::Either;
use super::error::{
DefaultError, ErrorContainer, ErrorRenderer, InternalError, WebResponseError,
};
use super::httprequest::HttpRequest;
pub struct Ready<T>(Option<T>);
impl<T> Unpin for Ready<T> {}
impl<T> From<T> for Ready<T> {
fn from(t: T) -> Self {
Ready(Some(t))
}
}
impl<T> Future for Ready<T> {
type Output = T;
#[inline]
fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<T> {
Poll::Ready(self.0.take().expect("Ready polled after completion"))
}
}
/// Trait implemented by types that can be converted to a http response.
///
/// Types that implement this trait can be used as the return type of a handler.
@ -85,7 +100,7 @@ impl<Err: ErrorRenderer> Responder<Err> for Response {
#[inline]
fn respond_to(self, _: &HttpRequest) -> Self::Future {
ready(self)
Ready(Some(self))
}
}
@ -95,7 +110,7 @@ impl<Err: ErrorRenderer> Responder<Err> for ResponseBuilder {
#[inline]
fn respond_to(mut self, _: &HttpRequest) -> Self::Future {
ready(self.finish())
Ready(Some(self.finish()))
}
}
@ -105,14 +120,14 @@ where
Err: ErrorRenderer,
{
type Error = T::Error;
type Future = EitherFuture<T::Future, Ready<Response>>;
type Future = Either<T::Future, Ready<Response>>;
fn respond_to(self, req: &HttpRequest) -> Self::Future {
match self {
Some(t) => EitherFuture::Left(t.respond_to(req)),
None => EitherFuture::Right(ready(
Some(t) => Either::Left(t.respond_to(req)),
None => Either::Right(Ready(Some(
Response::build(StatusCode::NOT_FOUND).finish(),
)),
))),
}
}
}
@ -124,12 +139,12 @@ where
Err: ErrorRenderer,
{
type Error = T::Error;
type Future = EitherFuture<T::Future, Ready<Response>>;
type Future = Either<T::Future, Ready<Response>>;
fn respond_to(self, req: &HttpRequest) -> Self::Future {
match self {
Ok(val) => EitherFuture::Left(val.respond_to(req)),
Err(e) => EitherFuture::Right(ready(e.into().error_response(req))),
Ok(val) => Either::Left(val.respond_to(req)),
Err(e) => Either::Right(Ready(Some(e.into().error_response(req)))),
}
}
}
@ -156,11 +171,11 @@ impl<Err: ErrorRenderer> Responder<Err> for &'static str {
type Future = Ready<Response>;
fn respond_to(self, _: &HttpRequest) -> Self::Future {
ready(
Ready(Some(
Response::build(StatusCode::OK)
.content_type("text/plain; charset=utf-8")
.body(self),
)
))
}
}
@ -169,11 +184,11 @@ impl<Err: ErrorRenderer> Responder<Err> for &'static [u8] {
type Future = Ready<Response>;
fn respond_to(self, _: &HttpRequest) -> Self::Future {
ready(
Ready(Some(
Response::build(StatusCode::OK)
.content_type("application/octet-stream")
.body(self),
)
))
}
}
@ -182,11 +197,11 @@ impl<Err: ErrorRenderer> Responder<Err> for String {
type Future = Ready<Response>;
fn respond_to(self, _: &HttpRequest) -> Self::Future {
ready(
Ready(Some(
Response::build(StatusCode::OK)
.content_type("text/plain; charset=utf-8")
.body(self),
)
))
}
}
@ -195,11 +210,11 @@ impl<'a, Err: ErrorRenderer> Responder<Err> for &'a String {
type Future = Ready<Response>;
fn respond_to(self, _: &HttpRequest) -> Self::Future {
ready(
Ready(Some(
Response::build(StatusCode::OK)
.content_type("text/plain; charset=utf-8")
.body(self),
)
))
}
}
@ -208,11 +223,11 @@ impl<Err: ErrorRenderer> Responder<Err> for Bytes {
type Future = Ready<Response>;
fn respond_to(self, _: &HttpRequest) -> Self::Future {
ready(
Ready(Some(
Response::build(StatusCode::OK)
.content_type("application/octet-stream")
.body(self),
)
))
}
}
@ -221,11 +236,11 @@ impl<Err: ErrorRenderer> Responder<Err> for BytesMut {
type Future = Ready<Response>;
fn respond_to(self, _: &HttpRequest) -> Self::Future {
ready(
Ready(Some(
Response::build(StatusCode::OK)
.content_type("application/octet-stream")
.body(self),
)
))
}
}
@ -336,7 +351,12 @@ impl<T: Responder<Err>, Err: ErrorRenderer> Future for CustomResponderFut<T, Err
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut res = ready!(this.fut.poll(cx));
let mut res = if let Poll::Ready(res) = this.fut.poll(cx) {
res
} else {
return Poll::Pending;
};
if let Some(status) = this.status.take() {
*res.status_mut() = status;
}
@ -352,7 +372,7 @@ impl<T: Responder<Err>, Err: ErrorRenderer> Future for CustomResponderFut<T, Err
/// Combines two different responder types into a single type
///
/// ```rust
/// use ntex::web::{Either, HttpResponse};
/// use ntex::{web::HttpResponse, util::Either};
///
/// fn index() -> Either<HttpResponse, &'static str> {
/// if is_a_variant() {
@ -366,19 +386,19 @@ impl<T: Responder<Err>, Err: ErrorRenderer> Future for CustomResponderFut<T, Err
/// # fn is_a_variant() -> bool { true }
/// # fn main() {}
/// ```
impl<A, B, Err> Responder<Err> for either::Either<A, B>
impl<A, B, Err> Responder<Err> for Either<A, B>
where
A: Responder<Err>,
B: Responder<Err>,
Err: ErrorRenderer,
{
type Error = Err::Container;
type Future = EitherFuture<A::Future, B::Future>;
type Future = Either<A::Future, B::Future>;
fn respond_to(self, req: &HttpRequest) -> Self::Future {
match self {
either::Either::Left(a) => EitherFuture::Left(a.respond_to(req)),
either::Either::Right(b) => EitherFuture::Right(b.respond_to(req)),
Either::Left(a) => Either::Left(a.respond_to(req)),
Either::Right(b) => Either::Right(b.respond_to(req)),
}
}
}
@ -392,7 +412,7 @@ where
type Future = Ready<Response>;
fn respond_to(self, req: &HttpRequest) -> Self::Future {
ready(self.error_response(req))
Ready(Some(self.error_response(req)))
}
}
@ -419,9 +439,9 @@ pub(crate) mod tests {
let srv = init_service(web::App::new().service(
web::resource("/index.html").to(|req: HttpRequest| async move {
if req.query_string().is_empty() {
either::Either::Left(HttpResponse::BadRequest())
Either::Left(HttpResponse::BadRequest())
} else {
either::Either::Right("hello")
Either::Right("hello")
}
}),
))

View file

@ -146,9 +146,8 @@ impl<Err: ErrorRenderer> Route<Err> {
///
/// ```rust
/// use ntex::web;
/// use serde_derive::Deserialize;
///
/// #[derive(Deserialize)]
/// #[derive(serde::Deserialize)]
/// struct Info {
/// username: String,
/// }
@ -170,10 +169,9 @@ impl<Err: ErrorRenderer> Route<Err> {
///
/// ```rust
/// # use std::collections::HashMap;
/// # use serde_derive::Deserialize;
/// use ntex::web;
///
/// #[derive(Deserialize)]
/// #[derive(serde::Deserialize)]
/// struct Info {
/// username: String,
/// }
@ -279,14 +277,13 @@ mod tests {
use std::time::Duration;
use bytes::Bytes;
use serde_derive::Serialize;
use crate::http::{Method, StatusCode};
use crate::rt::time::sleep;
use crate::web::test::{call_service, init_service, read_body, TestRequest};
use crate::web::{self, error, App, DefaultError, HttpResponse};
#[derive(Serialize, PartialEq, Debug)]
#[derive(serde::Serialize, PartialEq, Debug)]
struct MyObject {
name: String,
}

View file

@ -2,14 +2,12 @@ use std::{
cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll,
};
use futures::future::Either;
use crate::http::Response;
use crate::router::{IntoPattern, ResourceDef, ResourceInfo, Router};
use crate::service::boxed::{self, BoxService, BoxServiceFactory};
use crate::service::{apply, apply_fn_factory, pipeline_factory};
use crate::service::{IntoServiceFactory, Service, ServiceFactory, Transform};
use crate::util::{Extensions, Ready};
use crate::util::{Either, Extensions, Ready};
use super::config::ServiceConfig;
use super::dev::{WebServiceConfig, WebServiceFactory};
@ -306,7 +304,7 @@ where
// create and configure default resource
self.default = Rc::new(RefCell::new(Some(Rc::new(boxed::factory(
f.into_factory().map_init_err(|e| {
log::error!("Can not construct default service: {:?}", e)
log::error!("Cannot construct default service: {:?}", e)
}),
)))));
@ -677,13 +675,11 @@ impl<Err: ErrorRenderer> ServiceFactory for ScopeEndpoint<Err> {
#[cfg(test)]
mod tests {
use bytes::Bytes;
use futures::future::Either;
use crate::http::body::{Body, ResponseBody};
use crate::http::header::{HeaderValue, CONTENT_TYPE};
use crate::http::{Method, StatusCode};
use crate::service::{fn_service, Service};
use crate::util::{Bytes, Either};
use crate::web::middleware::DefaultHeaders;
use crate::web::request::WebRequest;
use crate::web::test::{call_service, init_service, read_body, TestRequest};

View file

@ -1,13 +1,9 @@
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use std::{fmt, io, net};
use std::{fmt, io, marker::PhantomData, net, sync::Arc, sync::Mutex};
#[cfg(feature = "openssl")]
use crate::server::openssl::{AlpnError, SslAcceptor, SslAcceptorBuilder};
#[cfg(feature = "rustls")]
use crate::server::rustls::ServerConfig as RustlsServerConfig;
#[cfg(unix)]
use futures::future::ok;
#[cfg(unix)]
use crate::http::Protocol;
@ -433,7 +429,7 @@ where
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
"Cannot bind to address.",
))
}
} else {
@ -505,7 +501,10 @@ where
socket_addr,
c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)),
);
pipeline_factory(|io: UnixStream| ok((io, Protocol::Http1, None))).and_then(
pipeline_factory(|io: UnixStream| {
crate::util::Ready::ok((io, Protocol::Http1, None))
})
.and_then(
HttpService::build()
.keep_alive(c.keep_alive)
.client_timeout(c.client_timeout)
@ -543,14 +542,16 @@ where
socket_addr,
c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)),
);
pipeline_factory(|io: UnixStream| ok((io, Protocol::Http1, None)))
.and_then(
HttpService::build()
.keep_alive(c.keep_alive)
.client_timeout(c.client_timeout)
.buffer_params(c.read_hw, c.write_hw, c.lw)
.finish(map_config(factory(), move |_| config.clone())),
)
pipeline_factory(|io: UnixStream| {
crate::util::Ready::ok((io, Protocol::Http1, None))
})
.and_then(
HttpService::build()
.keep_alive(c.keep_alive)
.client_timeout(c.client_timeout)
.buffer_params(c.read_hw, c.write_hw, c.lw)
.finish(map_config(factory(), move |_| config.clone())),
)
},
)?;
Ok(self)

View file

@ -365,8 +365,6 @@ tuple_web_service!((0,A),(1,B),(2,C),(3,D),(4,E),(5,F),(6,G),(7,H),(8,I),(9,J),(
#[cfg(test)]
mod tests {
use futures::future::ok;
use super::*;
use crate::http::{Method, StatusCode};
use crate::service::Service;
@ -398,8 +396,8 @@ mod tests {
async fn test_service() {
let srv = init_service(App::new().service(
web::service("/test").name("test").finish(
|req: WebRequest<DefaultError>| {
ok(req.into_response(HttpResponse::Ok().finish()))
|req: WebRequest<DefaultError>| async move {
Ok(req.into_response(HttpResponse::Ok().finish()))
},
),
))
@ -410,8 +408,8 @@ mod tests {
let srv = init_service(App::new().service(
web::service("/test").guard(guard::Get()).finish(
|req: WebRequest<DefaultError>| {
ok(req.into_response(HttpResponse::Ok().finish()))
|req: WebRequest<DefaultError>| async move {
Ok(req.into_response(HttpResponse::Ok().finish()))
},
),
))

View file

@ -5,8 +5,7 @@ use std::{
};
use bytes::{Bytes, BytesMut};
use futures::future::ok;
use futures::stream::{Stream, StreamExt};
use futures_core::Stream;
use serde::de::DeserializeOwned;
use serde::Serialize;
@ -24,7 +23,7 @@ use crate::http::{HttpService, Method, Payload, Request, StatusCode, Uri, Versio
use crate::router::{Path, ResourceDef};
use crate::rt::{time::sleep, System};
use crate::server::Server;
use crate::util::Extensions;
use crate::util::{next, Extensions, Ready};
use crate::{map_config, IntoService, IntoServiceFactory, Service, ServiceFactory};
use crate::web::config::AppConfig;
@ -52,7 +51,7 @@ pub fn default_service<Err: ErrorRenderer>(
Error = std::convert::Infallible,
> {
(move |req: WebRequest<Err>| {
ok(req.into_response(HttpResponse::build(status_code).finish()))
Ready::ok(req.into_response(HttpResponse::build(status_code).finish()))
})
.into_service()
}
@ -165,7 +164,7 @@ where
let mut body = resp.take_body();
let mut bytes = BytesMut::new();
while let Some(item) = body.next().await {
while let Some(item) = next(&mut body).await {
bytes.extend_from_slice(&item.unwrap());
}
bytes.freeze()
@ -201,7 +200,7 @@ where
pub async fn read_body(mut res: WebResponse) -> Bytes {
let mut body = res.take_body();
let mut bytes = BytesMut::new();
while let Some(item) = body.next().await {
while let Some(item) = next(&mut body).await {
bytes.extend_from_slice(&item.unwrap());
}
bytes.freeze()
@ -213,7 +212,7 @@ where
S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin,
{
let mut data = BytesMut::new();
while let Some(item) = stream.next().await {
while let Some(item) = next(&mut stream).await {
data.extend_from_slice(&item?);
}
Ok(data.freeze())
@ -735,7 +734,7 @@ where
builder.set_verify(SslVerifyMode::NONE);
let _ = builder
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
Connector::default()
.lifetime(time::Duration::from_secs(0))
.keep_alive(time::Duration::from_millis(30000))

View file

@ -4,8 +4,6 @@ use std::{fmt, future::Future, ops, pin::Pin, task::Context, task::Poll};
use bytes::BytesMut;
use encoding_rs::{Encoding, UTF_8};
use futures::future::{ready, Ready};
use futures::StreamExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
@ -13,8 +11,10 @@ use serde::Serialize;
use crate::http::encoding::Decoder;
use crate::http::header::{CONTENT_LENGTH, CONTENT_TYPE};
use crate::http::{HttpMessage, Payload, Response, StatusCode};
use crate::util::next;
use crate::web::error::{ErrorRenderer, UrlencodedError, WebResponseError};
use crate::web::{FromRequest, HttpRequest, Responder};
use crate::web::responder::{Ready, Responder};
use crate::web::{FromRequest, HttpRequest};
/// Form data helper (`application/x-www-form-urlencoded`)
///
@ -32,9 +32,8 @@ use crate::web::{FromRequest, HttpRequest, Responder};
/// ### Example
/// ```rust
/// use ntex::web;
/// use serde_derive::Deserialize;
///
/// #[derive(Deserialize)]
/// #[derive(serde::Deserialize)]
/// struct FormData {
/// username: String,
/// }
@ -57,9 +56,8 @@ use crate::web::{FromRequest, HttpRequest, Responder};
/// ### Example
/// ```rust
/// use ntex::web;
/// use serde_derive::Serialize;
///
/// #[derive(Serialize)]
/// #[derive(serde::Serialize)]
/// struct SomeForm {
/// name: String,
/// age: u8
@ -147,14 +145,13 @@ where
fn respond_to(self, req: &HttpRequest) -> Self::Future {
let body = match serde_urlencoded::to_string(&self.0) {
Ok(body) => body,
Err(e) => return ready(e.error_response(req)),
Err(e) => return e.error_response(req).into(),
};
ready(
Response::build(StatusCode::OK)
.header(CONTENT_TYPE, "application/x-www-form-urlencoded")
.body(body),
)
Response::build(StatusCode::OK)
.header(CONTENT_TYPE, "application/x-www-form-urlencoded")
.body(body)
.into()
}
}
@ -162,9 +159,8 @@ where
///
/// ```rust
/// use ntex::web::{self, App, Error, FromRequest};
/// use serde_derive::Deserialize;
///
/// #[derive(Deserialize)]
/// #[derive(serde::Deserialize)]
/// struct FormData {
/// username: String,
/// }
@ -316,7 +312,7 @@ where
self.fut = Some(Box::pin(async move {
let mut body = BytesMut::with_capacity(8192);
while let Some(item) = stream.next().await {
while let Some(item) = next(&mut stream).await {
let chunk = item?;
if (body.len() + chunk.len()) > limit {
return Err(UrlencodedError::Overflow {

View file

@ -3,8 +3,6 @@
use std::{fmt, future::Future, ops, pin::Pin, sync::Arc, task::Context, task::Poll};
use bytes::BytesMut;
use futures::future::{ready, Ready};
use futures::StreamExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
@ -12,8 +10,10 @@ use serde::Serialize;
use crate::http::encoding::Decoder;
use crate::http::header::CONTENT_LENGTH;
use crate::http::{HttpMessage, Payload, Response, StatusCode};
use crate::util::next;
use crate::web::error::{ErrorRenderer, JsonError, JsonPayloadError, WebResponseError};
use crate::web::{FromRequest, HttpRequest, Responder};
use crate::web::responder::{Ready, Responder};
use crate::web::{FromRequest, HttpRequest};
/// Json helper
///
@ -31,9 +31,8 @@ use crate::web::{FromRequest, HttpRequest, Responder};
///
/// ```rust
/// use ntex::web;
/// use serde_derive::Deserialize;
///
/// #[derive(Deserialize)]
/// #[derive(serde::Deserialize)]
/// struct Info {
/// username: String,
/// }
@ -58,9 +57,8 @@ use crate::web::{FromRequest, HttpRequest, Responder};
///
/// ```rust
/// use ntex::web;
/// use serde_derive::Serialize;
///
/// #[derive(Serialize)]
/// #[derive(serde::Serialize)]
/// struct MyObj {
/// name: String,
/// }
@ -123,14 +121,13 @@ where
fn respond_to(self, req: &HttpRequest) -> Self::Future {
let body = match serde_json::to_string(&self.0) {
Ok(body) => body,
Err(e) => return ready(e.error_response(req)),
Err(e) => return e.error_response(req).into(),
};
ready(
Response::build(StatusCode::OK)
.content_type("application/json")
.body(body),
)
Response::build(StatusCode::OK)
.content_type("application/json")
.body(body)
.into()
}
}
@ -147,9 +144,8 @@ where
///
/// ```rust
/// use ntex::web;
/// use serde_derive::Deserialize;
///
/// #[derive(Deserialize)]
/// #[derive(serde::Deserialize)]
/// struct Info {
/// username: String,
/// }
@ -203,9 +199,8 @@ where
/// ```rust
/// use ntex::http::error;
/// use ntex::web::{self, App, FromRequest, HttpResponse};
/// use serde_derive::Deserialize;
///
/// #[derive(Deserialize)]
/// #[derive(serde::Deserialize)]
/// struct Info {
/// username: String,
/// }
@ -362,7 +357,7 @@ where
self.fut = Some(Box::pin(async move {
let mut body = BytesMut::with_capacity(8192);
while let Some(item) = stream.next().await {
while let Some(item) = next(&mut stream).await {
let chunk = item?;
if (body.len() + chunk.len()) > limit {
return Err(JsonPayloadError::Overflow);
@ -380,13 +375,14 @@ where
#[cfg(test)]
mod tests {
use bytes::Bytes;
use serde_derive::{Deserialize, Serialize};
use super::*;
use crate::http::header;
use crate::web::test::{from_request, respond_to, TestRequest};
#[derive(Serialize, Deserialize, PartialEq, Debug, derive_more::Display)]
#[derive(
serde::Serialize, serde::Deserialize, PartialEq, Debug, derive_more::Display,
)]
struct MyObject {
name: String,
}

View file

@ -1,13 +1,11 @@
//! Path extractor
use std::{fmt, ops};
use futures::future::{ready, Ready};
use serde::de;
use crate::http::Payload;
use crate::router::PathDeserializer;
use crate::web::error::{ErrorRenderer, PathError};
use crate::web::{FromRequest, HttpRequest};
use crate::{http::Payload, router::PathDeserializer, util::Ready};
#[derive(PartialEq, Eq, PartialOrd, Ord)]
/// Extract typed information from the request's path.
@ -39,9 +37,8 @@ use crate::web::{FromRequest, HttpRequest};
///
/// ```rust
/// use ntex::web;
/// use serde_derive::Deserialize;
///
/// #[derive(Deserialize)]
/// #[derive(serde::Deserialize)]
/// struct Info {
/// username: String,
/// }
@ -134,9 +131,8 @@ impl<T: fmt::Display> fmt::Display for Path<T> {
///
/// ```rust
/// use ntex::web;
/// use serde_derive::Deserialize;
///
/// #[derive(Deserialize)]
/// #[derive(serde::Deserialize)]
/// struct Info {
/// username: String,
/// }
@ -158,11 +154,11 @@ where
T: de::DeserializeOwned,
{
type Error = PathError;
type Future = Ready<Result<Self, Self::Error>>;
type Future = Ready<Self, Self::Error>;
#[inline]
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
ready(
Ready::result(
de::Deserialize::deserialize(PathDeserializer::new(req.match_info()))
.map(|inner| Path { inner })
.map_err(move |e| {
@ -180,20 +176,19 @@ where
#[cfg(test)]
mod tests {
use derive_more::Display;
use serde_derive::Deserialize;
use super::*;
use crate::router::Router;
use crate::web::test::{from_request, TestRequest};
#[derive(Deserialize, Debug, Display)]
#[derive(serde::Deserialize, Debug, Display)]
#[display(fmt = "MyStruct({}, {})", key, value)]
struct MyStruct {
key: String,
value: String,
}
#[derive(Deserialize)]
#[derive(serde::Deserialize)]
struct Test2 {
key: String,
value: u32,

View file

@ -3,12 +3,11 @@ use std::{future::Future, pin::Pin, str, task::Context, task::Poll};
use bytes::{Bytes, BytesMut};
use encoding_rs::UTF_8;
use futures::future::Either;
use futures::{Stream, StreamExt};
use futures_core::Stream;
use mime::Mime;
use crate::http::{error, header, HttpMessage};
use crate::util::Ready;
use crate::util::{next, Either, Ready};
use crate::web::error::{ErrorRenderer, PayloadError};
use crate::web::{FromRequest, HttpRequest};
@ -18,14 +17,14 @@ use crate::web::{FromRequest, HttpRequest};
///
/// ```rust
/// use bytes::BytesMut;
/// use futures::{Future, Stream, StreamExt};
/// use futures::{Future, Stream};
/// use ntex::web::{self, error, App, HttpResponse};
///
/// /// extract binary data from request
/// async fn index(mut body: web::types::Payload) -> Result<HttpResponse, error::PayloadError>
/// {
/// let mut bytes = BytesMut::new();
/// while let Some(item) = body.next().await {
/// while let Some(item) = ntex::util::next(&mut body).await {
/// bytes.extend_from_slice(&item?);
/// }
///
@ -68,14 +67,14 @@ impl Stream for Payload {
///
/// ```rust
/// use bytes::BytesMut;
/// use futures::{Future, Stream, StreamExt};
/// use futures::{Future, Stream};
/// use ntex::web::{self, error, App, Error, HttpResponse};
///
/// /// extract binary data from request
/// async fn index(mut body: web::types::Payload) -> Result<HttpResponse, error::PayloadError>
/// {
/// let mut bytes = BytesMut::new();
/// while let Some(item) = body.next().await {
/// while let Some(item) = ntex::util::next(&mut body).await {
/// bytes.extend_from_slice(&item?);
/// }
///
@ -393,7 +392,7 @@ impl Future for HttpMessageBody {
self.fut = Some(Box::pin(async move {
let mut body = BytesMut::with_capacity(8192);
while let Some(item) = stream.next().await {
while let Some(item) = next(&mut stream).await {
let chunk = item?;
if body.len() + chunk.len() > limit {
return Err(PayloadError::from(error::PayloadError::Overflow));
@ -410,7 +409,6 @@ impl Future for HttpMessageBody {
#[cfg(test)]
mod tests {
use bytes::Bytes;
use futures::StreamExt;
use super::*;
use crate::http::header;
@ -444,7 +442,7 @@ mod tests {
.await
.unwrap()
.into_inner();
let b = s.next().await.unwrap().unwrap();
let b = next(&mut s).await.unwrap().unwrap();
assert_eq!(b, Bytes::from_static(b"hello=world"));
}

View file

@ -20,15 +20,14 @@ use crate::{http::Payload, util::Ready};
///
/// ```rust
/// use ntex::web;
/// use serde_derive::Deserialize;
///
/// #[derive(Debug, Deserialize)]
/// #[derive(Debug, serde::Deserialize)]
/// pub enum ResponseType {
/// Token,
/// Code
/// }
///
/// #[derive(Deserialize)]
/// #[derive(serde::Deserialize)]
/// pub struct AuthRequest {
/// id: u64,
/// response_type: ResponseType,
@ -98,15 +97,14 @@ impl<T: fmt::Display> fmt::Display for Query<T> {
///
/// ```rust
/// use ntex::web;
/// use serde_derive::Deserialize;
///
/// #[derive(Debug, Deserialize)]
/// #[derive(Debug, serde::Deserialize)]
/// pub enum ResponseType {
/// Token,
/// Code
/// }
///
/// #[derive(Deserialize)]
/// #[derive(serde::Deserialize)]
/// pub struct AuthRequest {
/// id: u64,
/// response_type: ResponseType,
@ -153,12 +151,11 @@ where
#[cfg(test)]
mod tests {
use derive_more::Display;
use serde_derive::Deserialize;
use super::*;
use crate::web::test::{from_request, TestRequest};
#[derive(Deserialize, Debug, Display)]
#[derive(serde::Deserialize, Debug, Display)]
struct Id {
id: String,
}

View file

@ -1,7 +1,10 @@
use std::error::Error as StdError;
use std::{
error::Error as StdError, marker::PhantomData, pin::Pin, task::Context, task::Poll,
};
use bytes::Bytes;
use futures::{Sink, Stream, TryStreamExt};
use futures_core::Stream;
use futures_sink::Sink;
pub use crate::ws::{CloseCode, CloseReason, Frame, Message};
@ -84,10 +87,10 @@ where
// start websockets service dispatcher
rt::spawn(crate::util::stream::Dispatcher::new(
// wrap bytes stream to ws::Frame's stream
ws::StreamDecoder::new(payload).map_err(|e| {
let e: Box<dyn StdError> = Box::new(e);
e
}),
MapStream {
stream: ws::StreamDecoder::new(payload),
_t: PhantomData,
},
// converter wraper from ws::Message to Bytes
sink,
// websockets handler service
@ -96,3 +99,35 @@ where
Ok(res.body(Body::from_message(BoxedBodyStream::new(rx))))
}
pin_project_lite::pin_project! {
struct MapStream<S, I, E>{
#[pin]
stream: S,
_t: PhantomData<(I, E)>,
}
}
impl<S, I, E> Stream for MapStream<S, I, E>
where
S: Stream<Item = Result<I, E>>,
E: StdError + 'static,
{
type Item = Result<I, Box<dyn StdError>>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.project().stream.poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(Ok(item))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(Box::new(err)))),
Poll::Ready(None) => Poll::Ready(None),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}

View file

@ -2,6 +2,7 @@ use std::convert::TryFrom;
use bytes::{Buf, BufMut, BytesMut};
use log::debug;
use nanorand::{WyRand, RNG};
use super::mask::apply_mask;
use super::proto::{CloseCode, CloseReason, OpCode};
@ -187,7 +188,7 @@ impl Parser {
};
if mask {
let mask = rand::random::<u32>();
let mask: u32 = WyRand::new().generate();
dst.put_u32_le(mask);
dst.extend_from_slice(payload.as_ref());
let pos = dst.len() - payload_len;

View file

@ -3,10 +3,10 @@ use std::{
};
use bytes::{Bytes, BytesMut};
use futures::{Sink, Stream};
use ntex_codec::{Decoder, Encoder};
use super::{Codec, Frame, Message, ProtocolError};
use crate::{Sink, Stream};
/// Stream error
#[derive(Debug, Display)]
@ -171,10 +171,9 @@ where
#[cfg(test)]
mod tests {
use bytestring::ByteString;
use futures::{SinkExt, StreamExt};
use super::*;
use crate::channel::mpsc;
use crate::{channel::mpsc, util::next, util::poll_fn, util::send};
#[crate::rt_test]
async fn test_decoder() {
@ -191,12 +190,12 @@ mod tests {
.unwrap();
tx.send(Ok::<_, ()>(buf.split().freeze())).unwrap();
let frame = StreamExt::next(&mut decoder).await.unwrap().unwrap();
let frame = next(&mut decoder).await.unwrap().unwrap();
match frame {
Frame::Text(data) => assert_eq!(data, b"test1"[..]),
_ => panic!(),
}
let frame = StreamExt::next(&mut decoder).await.unwrap().unwrap();
let frame = next(&mut decoder).await.unwrap().unwrap();
match frame {
Frame::Text(data) => assert_eq!(data, b"test2"[..]),
_ => panic!(),
@ -208,15 +207,21 @@ mod tests {
let (tx, mut rx) = mpsc::channel();
let mut encoder = StreamEncoder::new(tx);
encoder
.send(Ok::<_, ()>(Message::Text(ByteString::from_static("test"))))
send(
&mut encoder,
Ok::<_, ()>(Message::Text(ByteString::from_static("test"))),
)
.await
.unwrap();
poll_fn(|cx| Pin::new(&mut encoder).poll_flush(cx))
.await
.unwrap();
poll_fn(|cx| Pin::new(&mut encoder).poll_close(cx))
.await
.unwrap();
encoder.flush().await.unwrap();
encoder.close().await.unwrap();
let data = rx.next().await.unwrap().unwrap();
let data = next(&mut rx).await.unwrap().unwrap();
assert_eq!(data, b"\x81\x04test".as_ref());
assert!(rx.next().await.is_none());
assert!(next(&mut rx).await.is_none());
}
}

View file

@ -63,7 +63,7 @@ async fn test_connection_reuse_h2() {
builder.set_verify(SslVerifyMode::NONE);
let _ = builder
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
let client = Client::build()
.connector(Connector::default().openssl(builder.build()).finish())

View file

@ -50,7 +50,7 @@ fn ssl_acceptor() -> SslAcceptor {
});
builder
.set_alpn_protos(b"\x08http/1.1\x02h2")
.expect("Can not contrust SslAcceptor");
.expect("Cannot contrust SslAcceptor");
builder.build()
}

View file

@ -87,7 +87,7 @@ fn client() -> ntex::http::client::Client {
builder.set_verify(SslVerifyMode::NONE);
let _ = builder
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
ntex::http::client::Client::build()
.timeout(Duration::from_millis(30000))