From cb9e3ffeda3f80dcad274ce63af3d81368190e73 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 3 Apr 2021 13:25:20 +0600 Subject: [PATCH] reduce futures crate dependencies --- ntex-codec/Cargo.toml | 10 +- ntex-codec/src/framed.rs | 2 +- ntex-rt/CHANGES.md | 4 + ntex-rt/Cargo.toml | 6 +- ntex-rt/src/arbiter.rs | 38 ++-- ntex-rt/src/builder.rs | 18 +- ntex-rt/src/lib.rs | 10 +- ntex-rt/src/system.rs | 8 +- ntex-service/src/either.rs | 208 ++++++++++++++++++++++ ntex-service/src/lib.rs | 2 + ntex/CHANGES.md | 6 +- ntex/Cargo.toml | 22 +-- ntex/src/channel/condition.rs | 6 +- ntex/src/channel/mod.rs | 13 ++ ntex/src/channel/mpsc.rs | 25 ++- ntex/src/channel/oneshot.rs | 10 +- ntex/src/channel/pool.rs | 9 +- ntex/src/connect/message.rs | 2 +- ntex/src/connect/resolve.rs | 9 +- ntex/src/framed/dispatcher.rs | 3 +- ntex/src/framed/state.rs | 8 +- ntex/src/framed/write.rs | 4 +- ntex/src/http/body.rs | 6 +- ntex/src/http/client/connection.rs | 3 +- ntex/src/http/client/connector.rs | 8 +- ntex/src/http/client/error.rs | 2 +- ntex/src/http/client/frozen.rs | 2 +- ntex/src/http/client/h1proto.rs | 22 +-- ntex/src/http/client/h2proto.rs | 2 +- ntex/src/http/client/pool.rs | 17 +- ntex/src/http/client/request.rs | 2 +- ntex/src/http/client/response.rs | 2 +- ntex/src/http/client/sender.rs | 7 +- ntex/src/http/client/test.rs | 2 +- ntex/src/http/client/ws.rs | 14 +- ntex/src/http/config.rs | 14 +- ntex/src/http/encoding/decoder.rs | 16 +- ntex/src/http/encoding/encoder.rs | 20 +-- ntex/src/http/error.rs | 10 +- ntex/src/http/h1/dispatcher.rs | 43 +++-- ntex/src/http/h1/payload.rs | 8 +- ntex/src/http/h1/upgrade.rs | 9 +- ntex/src/http/h2/mod.rs | 2 +- ntex/src/http/payload.rs | 2 +- ntex/src/http/response.rs | 3 +- ntex/src/http/test.rs | 4 +- ntex/src/lib.rs | 3 + ntex/src/server/accept.rs | 16 +- ntex/src/server/builder.rs | 36 ++-- ntex/src/server/config.rs | 4 +- ntex/src/server/mod.rs | 23 ++- ntex/src/server/openssl.rs | 8 +- ntex/src/server/rustls.rs | 8 +- ntex/src/server/service.rs | 2 +- ntex/src/server/signals.rs | 2 +- ntex/src/server/worker.rs | 52 +++--- ntex/src/testing.rs | 46 +++-- ntex/src/util/buffer.rs | 12 +- ntex/src/util/inflight.rs | 7 +- ntex/src/util/keepalive.rs | 3 +- ntex/src/util/mod.rs | 157 +++++++++++++++- ntex/src/util/sink.rs | 9 +- ntex/src/util/stream.rs | 35 ++-- ntex/src/util/time.rs | 2 +- ntex/src/util/timeout.rs | 16 +- ntex/src/util/variant.rs | 16 +- ntex/src/web/app.rs | 37 ++-- ntex/src/web/error.rs | 31 ++-- ntex/src/web/extract.rs | 9 +- ntex/src/web/httprequest.rs | 3 +- ntex/src/web/middleware/compress.rs | 6 +- ntex/src/web/middleware/defaultheaders.rs | 7 +- ntex/src/web/middleware/logger.rs | 50 +++--- ntex/src/web/mod.rs | 1 - ntex/src/web/resource.rs | 10 +- ntex/src/web/responder.rs | 92 ++++++---- ntex/src/web/route.rs | 9 +- ntex/src/web/scope.rs | 10 +- ntex/src/web/server.rs | 31 ++-- ntex/src/web/service.rs | 10 +- ntex/src/web/test.rs | 15 +- ntex/src/web/types/form.rs | 28 ++- ntex/src/web/types/json.rs | 36 ++-- ntex/src/web/types/path.rs | 19 +- ntex/src/web/types/payload.rs | 18 +- ntex/src/web/types/query.rs | 13 +- ntex/src/web/ws.rs | 47 ++++- ntex/src/ws/frame.rs | 3 +- ntex/src/ws/stream.rs | 27 +-- ntex/tests/http_awc_openssl_client.rs | 2 +- ntex/tests/http_openssl.rs | 2 +- ntex/tests/web_httpserver.rs | 2 +- 92 files changed, 1004 insertions(+), 614 deletions(-) create mode 100644 ntex-service/src/either.rs diff --git a/ntex-codec/Cargo.toml b/ntex-codec/Cargo.toml index 7b9b5703..259f2cc7 100644 --- a/ntex-codec/Cargo.toml +++ b/ntex-codec/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-codec" -version = "0.4.0" +version = "0.4.1" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -18,12 +18,12 @@ path = "src/lib.rs" [dependencies] bitflags = "1.2.1" bytes = "1.0" -either = "1.6.1" -futures-core = "0.3.12" -futures-sink = "0.3.12" +ntex-service = "0.1.7" +futures-core = { version = "0.3.13", default-features = false, features = ["alloc"] } +futures-sink = { version = "0.3.13", default-features = false, features = ["alloc"] } log = "0.4" tokio = { version = "1", default-features=false } [dev-dependencies] -ntex = "0.3.0-b.1" +ntex = "0.3.13" futures = "0.3.13" diff --git a/ntex-codec/src/framed.rs b/ntex-codec/src/framed.rs index 534c51a1..8e85d53e 100644 --- a/ntex-codec/src/framed.rs +++ b/ntex-codec/src/framed.rs @@ -3,9 +3,9 @@ use std::task::{Context, Poll}; use std::{fmt, io}; use bytes::{Buf, BytesMut}; -use either::Either; use futures_core::{ready, Stream}; use futures_sink::Sink; +use ntex_service::util::Either; use crate::{AsyncRead, AsyncWrite, Decoder, Encoder}; diff --git a/ntex-rt/CHANGES.md b/ntex-rt/CHANGES.md index 4fb70b6c..1e92fdcb 100644 --- a/ntex-rt/CHANGES.md +++ b/ntex-rt/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.2.2] - 2021-04-03 + +* precise futures crate dependency + ## [0.2.1] - 2021-02-25 * Drop macros diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index 2e2bebb1..065c0da8 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-rt" -version = "0.2.1" +version = "0.2.2" authors = ["ntex contributors "] description = "ntex runtime" keywords = ["network", "framework", "async", "futures"] @@ -16,5 +16,5 @@ name = "ntex_rt" path = "src/lib.rs" [dependencies] -futures = "0.3.13" -tokio = { version = "1", default-features=false, features = ["rt", "net", "time", "signal"] } +ntex-service = "0.1.7" +tokio = { version = "1", default-features=false, features = ["rt", "net", "time", "signal", "sync"] } diff --git a/ntex-rt/src/arbiter.rs b/ntex-rt/src/arbiter.rs index 76b4dd50..e8eea0bd 100644 --- a/ntex-rt/src/arbiter.rs +++ b/ntex-rt/src/arbiter.rs @@ -1,14 +1,10 @@ use std::any::{Any, TypeId}; -use std::cell::RefCell; -use std::collections::HashMap; -use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; -use std::{fmt, thread}; +use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread}; -use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; -use futures::channel::oneshot::{channel, Canceled, Sender}; -use futures::{Future, Stream}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot::{channel, error::RecvError, Sender}; use tokio::task::LocalSet; use super::runtime::Runtime; @@ -56,7 +52,7 @@ impl Default for Arbiter { impl Arbiter { pub(super) fn new_system(local: &LocalSet) -> Self { - let (tx, rx) = unbounded(); + let (tx, rx) = unbounded_channel(); let arb = Arbiter::with_sender(tx); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); @@ -78,7 +74,7 @@ impl Arbiter { /// Stop arbiter from continuing it's event loop. pub fn stop(&self) { - let _ = self.sender.unbounded_send(ArbiterCommand::Stop); + let _ = self.sender.send(ArbiterCommand::Stop); } /// Spawn new thread and run event loop in spawned thread. @@ -87,13 +83,13 @@ impl Arbiter { let id = COUNT.fetch_add(1, Ordering::Relaxed); let name = format!("ntex-rt:worker:{}", id); let sys = System::current(); - let (arb_tx, arb_rx) = unbounded(); + let (arb_tx, arb_rx) = unbounded_channel(); let arb_tx2 = arb_tx.clone(); let handle = thread::Builder::new() .name(name.clone()) .spawn(move || { - let rt = Runtime::new().expect("Can not create Runtime"); + let rt = Runtime::new().expect("Cannot create Runtime"); let arb = Arbiter::with_sender(arb_tx); let (stop, stop_rx) = channel(); @@ -111,7 +107,7 @@ impl Arbiter { // register arbiter let _ = System::current() .sys() - .unbounded_send(SystemCommand::RegisterArbiter(id, arb)); + .send(SystemCommand::RegisterArbiter(id, arb)); // run loop let _ = rt.block_on(stop_rx); @@ -119,7 +115,7 @@ impl Arbiter { // unregister arbiter let _ = System::current() .sys() - .unbounded_send(SystemCommand::UnregisterArbiter(id)); + .send(SystemCommand::UnregisterArbiter(id)); }) .unwrap_or_else(|err| { panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err) @@ -136,15 +132,13 @@ impl Arbiter { where F: Future + Send + Unpin + 'static, { - let _ = self - .sender - .unbounded_send(ArbiterCommand::Execute(Box::new(future))); + let _ = self.sender.send(ArbiterCommand::Execute(Box::new(future))); } /// Send a function to the Arbiter's thread. This function will be executed asynchronously. /// A future is created, and when resolved will contain the result of the function sent /// to the Arbiters thread. - pub fn exec(&self, f: F) -> impl Future> + pub fn exec(&self, f: F) -> impl Future> where F: FnOnce() -> R + Send + 'static, R: Send + 'static, @@ -152,8 +146,8 @@ impl Arbiter { let (tx, rx) = channel(); let _ = self .sender - .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { - if !tx.is_canceled() { + .send(ArbiterCommand::ExecuteFn(Box::new(move || { + if !tx.is_closed() { let _ = tx.send(f()); } }))); @@ -168,7 +162,7 @@ impl Arbiter { { let _ = self .sender - .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { + .send(ArbiterCommand::ExecuteFn(Box::new(move || { f(); }))); } @@ -261,7 +255,7 @@ impl Future for ArbiterController { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match Pin::new(&mut self.rx).poll_next(cx) { + match Pin::new(&mut self.rx).poll_recv(cx) { Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(item)) => match item { ArbiterCommand::Stop => { @@ -315,7 +309,7 @@ impl Future for SystemArbiter { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match Pin::new(&mut self.commands).poll_next(cx) { + match Pin::new(&mut self.commands).poll_recv(cx) { Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(cmd)) => match cmd { SystemCommand::Exit(code) => { diff --git a/ntex-rt/src/builder.rs b/ntex-rt/src/builder.rs index 7fe8e469..fecfae4c 100644 --- a/ntex-rt/src/builder.rs +++ b/ntex-rt/src/builder.rs @@ -1,9 +1,8 @@ -use std::borrow::Cow; -use std::io; +use std::{borrow::Cow, future::Future, io}; -use futures::channel::mpsc::unbounded; -use futures::channel::oneshot::{channel, Receiver}; -use futures::future::{lazy, Future, FutureExt}; +use ntex_service::util::lazy; +use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::oneshot::{channel, Receiver}; use tokio::task::LocalSet; use super::arbiter::{Arbiter, SystemArbiter}; @@ -74,7 +73,7 @@ impl Builder { fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { let (stop_tx, stop) = channel(); - let (sys_sender, sys_receiver) = unbounded(); + let (sys_sender, sys_receiver) = unbounded_channel(); let system = System::construct( sys_sender, @@ -96,7 +95,7 @@ impl Builder { F: FnOnce() + 'static, { let (stop_tx, stop) = channel(); - let (sys_sender, sys_receiver) = unbounded(); + let (sys_sender, sys_receiver) = unbounded_channel(); let rt = Runtime::new().unwrap(); @@ -129,7 +128,7 @@ impl AsyncSystemRunner { let AsyncSystemRunner { stop, .. } = self; // run loop - lazy(|_| async { + async move { match stop.await { Ok(code) => { if code != 0 { @@ -143,8 +142,7 @@ impl AsyncSystemRunner { } Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), } - }) - .flatten() + } } } diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index fca34fa0..4007f2c5 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -1,5 +1,6 @@ //! A runtime implementation that runs everything on the current thread. -use futures::future::{self, Future, FutureExt}; +use ntex_service::util::lazy; +use std::future::Future; mod arbiter; mod builder; @@ -21,7 +22,7 @@ pub use self::system::System; #[inline] pub fn spawn(f: F) -> self::task::JoinHandle where - F: futures::Future + 'static, + F: Future + 'static, { tokio::task::spawn_local(f) } @@ -39,7 +40,10 @@ where F: FnOnce() -> R + 'static, R: Future + 'static, { - tokio::task::spawn_local(future::lazy(|_| f()).flatten()) + tokio::task::spawn_local(async move { + let r = lazy(|_| f()).await; + r.await + }) } /// Asynchronous signal handling diff --git a/ntex-rt/src/system.rs b/ntex-rt/src/system.rs index c83e4af5..6f8cddd0 100644 --- a/ntex-rt/src/system.rs +++ b/ntex-rt/src/system.rs @@ -1,8 +1,6 @@ -use std::cell::RefCell; -use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; - -use futures::channel::mpsc::UnboundedSender; +use std::{cell::RefCell, io}; +use tokio::sync::mpsc::UnboundedSender; use super::arbiter::{Arbiter, SystemCommand}; use super::builder::{Builder, SystemRunner}; @@ -83,7 +81,7 @@ impl System { /// Stop the system with a particular exit code. pub fn stop_with_code(&self, code: i32) { - let _ = self.sys.unbounded_send(SystemCommand::Exit(code)); + let _ = self.sys.send(SystemCommand::Exit(code)); } pub(super) fn sys(&self) -> &UnboundedSender { diff --git a/ntex-service/src/either.rs b/ntex-service/src/either.rs new file mode 100644 index 00000000..7fc1566c --- /dev/null +++ b/ntex-service/src/either.rs @@ -0,0 +1,208 @@ +use std::{error, fmt, future::Future, pin::Pin, task::Context, task::Poll}; + +/// Combines two different futures, streams, or sinks having the same associated types into a single +/// type. +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] +pub enum Either { + /// First branch of the type + Left(/* #[pin] */ A), + /// Second branch of the type + Right(/* #[pin] */ B), +} + +impl Either { + fn project(self: Pin<&mut Self>) -> Either, Pin<&mut B>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(a) => Either::Left(Pin::new_unchecked(a)), + Either::Right(b) => Either::Right(Pin::new_unchecked(b)), + } + } + } + + #[inline] + /// Return true if the value is the `Left` variant. + /// + /// ``` + /// use either::*; + /// + /// let values = [Left(1), Right("the right value")]; + /// assert_eq!(values[0].is_left(), true); + /// assert_eq!(values[1].is_left(), false); + /// ``` + pub fn is_left(&self) -> bool { + match *self { + Either::Left(_) => true, + Either::Right(_) => false, + } + } + + #[inline] + /// Return true if the value is the `Right` variant. + /// + /// ``` + /// use either::*; + /// + /// let values = [Left(1), Right("the right value")]; + /// assert_eq!(values[0].is_right(), false); + /// assert_eq!(values[1].is_right(), true); + /// ``` + pub fn is_right(&self) -> bool { + !self.is_left() + } + + #[inline] + /// Convert the left side of `Either` to an `Option`. + /// + /// ``` + /// use either::*; + /// + /// let left: Either<_, ()> = Left("some value"); + /// assert_eq!(left.left(), Some("some value")); + /// + /// let right: Either<(), _> = Right(321); + /// assert_eq!(right.left(), None); + /// ``` + pub fn left(self) -> Option { + match self { + Either::Left(l) => Some(l), + Either::Right(_) => None, + } + } + + #[inline] + /// Convert the right side of `Either` to an `Option`. + /// + /// ``` + /// use either::*; + /// + /// let left: Either<_, ()> = Left("some value"); + /// assert_eq!(left.right(), None); + /// + /// let right: Either<(), _> = Right(321); + /// assert_eq!(right.right(), Some(321)); + /// ``` + pub fn right(self) -> Option { + match self { + Either::Left(_) => None, + Either::Right(r) => Some(r), + } + } + + /// Convert `&Either` to `Either<&L, &R>`. + /// + /// ``` + /// use either::*; + /// + /// let left: Either<_, ()> = Left("some value"); + /// assert_eq!(left.as_ref(), Left(&"some value")); + /// + /// let right: Either<(), _> = Right("some value"); + /// assert_eq!(right.as_ref(), Right(&"some value")); + /// ``` + pub fn as_ref(&self) -> Either<&A, &B> { + match *self { + Either::Left(ref inner) => Either::Left(inner), + Either::Right(ref inner) => Either::Right(inner), + } + } + + /// Convert `&mut Either` to `Either<&mut L, &mut R>`. + /// + /// ``` + /// use either::*; + /// + /// fn mutate_left(value: &mut Either) { + /// if let Some(l) = value.as_mut().left() { + /// *l = 999; + /// } + /// } + /// + /// let mut left = Left(123); + /// let mut right = Right(123); + /// mutate_left(&mut left); + /// mutate_left(&mut right); + /// assert_eq!(left, Left(999)); + /// assert_eq!(right, Right(123)); + /// ``` + pub fn as_mut(&mut self) -> Either<&mut A, &mut B> { + match *self { + Either::Left(ref mut inner) => Either::Left(inner), + Either::Right(ref mut inner) => Either::Right(inner), + } + } +} + +impl Either<(T, A), (T, B)> { + #[inline] + /// Factor out a homogeneous type from an either of pairs. + /// + /// Here, the homogeneous type is the first element of the pairs. + pub fn factor_first(self) -> (T, Either) { + match self { + Either::Left((x, a)) => (x, Either::Left(a)), + Either::Right((x, b)) => (x, Either::Right(b)), + } + } +} + +impl Either<(A, T), (B, T)> { + #[inline] + /// Factor out a homogeneous type from an either of pairs. + /// + /// Here, the homogeneous type is the second element of the pairs. + pub fn factor_second(self) -> (Either, T) { + match self { + Either::Left((a, x)) => (Either::Left(a), x), + Either::Right((b, x)) => (Either::Right(b), x), + } + } +} + +impl Either { + #[inline] + /// Extract the value of an either over two equivalent types. + pub fn into_inner(self) -> T { + match self { + Either::Left(x) => x, + Either::Right(x) => x, + } + } +} + +/// `Either` implements `Error` if *both* `A` and `B` implement it. +impl error::Error for Either +where + A: error::Error, + B: error::Error, +{ +} + +impl fmt::Display for Either +where + A: fmt::Display, + B: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Either::Left(a) => a.fmt(f), + Either::Right(b) => b.fmt(f), + } + } +} + +impl Future for Either +where + A: Future, + B: Future, +{ + type Output = A::Output; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project() { + Either::Left(x) => x.poll(cx), + Either::Right(x) => x.poll(cx), + } + } +} diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index 697b2dd8..774199ed 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -21,6 +21,7 @@ mod then; mod transform; mod transform_err; +mod either; mod lazy; mod ready; @@ -341,6 +342,7 @@ where } pub mod util { + pub use crate::either::Either; pub use crate::lazy::{lazy, Lazy}; pub use crate::ready::Ready; } diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 7113ed56..7c42d0e8 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,10 +1,12 @@ # Changes -## [0.3.14] - 2021-04-xx +## [0.3.14] - 2021-04-03 * server: prevent double socket registration if accept loop is in back-pressure state -* util: add custom Ready future +* util: add custom Ready, Either future and several helper functions + +* reduce futures crate dependencies ## [0.3.13] - 2021-03-26 diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 6c0d81c8..199b24d6 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.3.13" +version = "0.3.14" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -36,10 +36,10 @@ compress = ["flate2", "brotli2"] cookie = ["coo-kie", "coo-kie/percent-encode"] [dependencies] -ntex-codec = "0.4.0" -ntex-rt = "0.2.1" +ntex-codec = "0.4.1" +ntex-rt = "0.2.2" ntex-router = "0.4.2" -ntex-service = "0.1.6" +ntex-service = "0.1.7" ntex-macros = "0.1.3" base64 = "0.13" @@ -47,20 +47,21 @@ bitflags = "1.2" bytes = "1.0" bytestring = { version = "1.0", features = ["serde"] } derive_more = "0.99.13" -either = "1.6.1" encoding_rs = "0.8" -futures = "0.3.13" +futures-core = { version = "0.3.13", default-features = false, features = ["alloc"] } +futures-sink = { version = "0.3.13", default-features = false, features = ["alloc"] } ahash = "0.7.2" h2 = "0.3" http = "0.2" httparse = "1.3" +httpdate = "1.0" log = "0.4" mime = "0.3" mio = "0.7.10" num_cpus = "1.13" +nanorand = "0.5" percent-encoding = "2.1" pin-project-lite = "0.2" -rand = "0.8" regex = "1.4" sha-1 = "0.9" slab = "0.4" @@ -70,8 +71,7 @@ serde_urlencoded = "0.7" socket2 = "0.4" url = "2.1" coo-kie = { version = "0.15", package = "cookie", optional = true } -time = { version = "0.2", default-features = false, features = ["std"] } -tokio = { version = "1", default-features=false } +tokio = { version = "1", default-features=false, features = ["sync"] } # resolver trust-dns-proto = { version = "0.20", default-features = false } @@ -93,7 +93,9 @@ flate2 = { version = "1.0.20", optional = true } [dev-dependencies] env_logger = "0.8" -serde_derive = "1.0" +rand = "0.8" +time = "0.2" open-ssl = { version="0.10", package = "openssl" } rust-tls = { version = "0.19", package="rustls", features = ["dangerous_configuration"] } webpki = "0.21" +futures = "0.3.13" diff --git a/ntex/src/channel/condition.rs b/ntex/src/channel/condition.rs index aefd0d30..e1f0537a 100644 --- a/ntex/src/channel/condition.rs +++ b/ntex/src/channel/condition.rs @@ -1,6 +1,4 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{future::Future, pin::Pin, task::Context, task::Poll}; use slab::Slab; @@ -106,7 +104,7 @@ impl Drop for Waiter { #[cfg(test)] mod tests { use super::*; - use futures::future::lazy; + use crate::util::lazy; #[crate::rt_test] #[allow(clippy::unit_cmp)] diff --git a/ntex/src/channel/mod.rs b/ntex/src/channel/mod.rs index 75fa895f..243aea2a 100644 --- a/ntex/src/channel/mod.rs +++ b/ntex/src/channel/mod.rs @@ -5,3 +5,16 @@ pub mod condition; pub mod mpsc; pub mod oneshot; pub mod pool; + +/// Error returned from a [`Receiver`](Receiver) when the corresponding +/// [`Sender`](Sender) is dropped. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct Canceled; + +impl std::fmt::Display for Canceled { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::write!(f, "oneshot canceled") + } +} + +impl std::error::Error for Canceled {} diff --git a/ntex/src/channel/mpsc.rs b/ntex/src/channel/mpsc.rs index 6e0b2665..a0d69779 100644 --- a/ntex/src/channel/mpsc.rs +++ b/ntex/src/channel/mpsc.rs @@ -1,11 +1,8 @@ //! A multi-producer, single-consumer, futures-aware, FIFO queue. -use std::collections::VecDeque; -use std::error::Error; -use std::fmt; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{collections::VecDeque, fmt, pin::Pin, task::Context, task::Poll}; -use futures::{Sink, Stream}; +use futures_core::Stream; +use futures_sink::Sink; use super::cell::{Cell, WeakCell}; use crate::task::LocalWaker; @@ -209,7 +206,7 @@ impl Drop for Receiver { /// dropped pub struct SendError(T); -impl Error for SendError {} +impl std::error::Error for SendError {} impl fmt::Debug for SendError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -233,8 +230,8 @@ impl SendError { #[cfg(test)] mod tests { use super::*; - use futures::future::lazy; - use futures::{Sink, Stream, StreamExt}; + use crate::{util::lazy, util::next, Stream}; + use futures_sink::Sink; #[crate::rt_test] async fn test_mpsc() { @@ -243,11 +240,11 @@ mod tests { assert!(format!("{:?}", rx).contains("Receiver")); tx.send("test").unwrap(); - assert_eq!(rx.next().await.unwrap(), "test"); + assert_eq!(next(&mut rx).await.unwrap(), "test"); let tx2 = tx.clone(); tx2.send("test2").unwrap(); - assert_eq!(rx.next().await.unwrap(), "test2"); + assert_eq!(next(&mut rx).await.unwrap(), "test2"); assert_eq!( lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await, @@ -262,7 +259,7 @@ mod tests { let (tx, mut rx) = channel::(); tx.close(); - assert_eq!(rx.next().await, None); + assert_eq!(next(&mut rx).await, None); let (tx, _rx) = channel::(); let weak_tx = tx.downgrade(); @@ -295,8 +292,8 @@ mod tests { assert!(Pin::new(&mut tx).poll_close(cx).is_ready()); }) .await; - assert_eq!(rx.next().await.unwrap(), "test"); - assert_eq!(rx.next().await, None); + assert_eq!(next(&mut rx).await.unwrap(), "test"); + assert_eq!(next(&mut rx).await, None); } #[crate::rt_test] diff --git a/ntex/src/channel/oneshot.rs b/ntex/src/channel/oneshot.rs index fcfb8e2c..391bd821 100644 --- a/ntex/src/channel/oneshot.rs +++ b/ntex/src/channel/oneshot.rs @@ -1,11 +1,7 @@ //! A one-shot, futures-aware channel. -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{future::Future, pin::Pin, task::Context, task::Poll}; -pub use futures::channel::oneshot::Canceled; - -use super::cell::Cell; +use super::{cell::Cell, Canceled}; use crate::task::LocalWaker; /// Creates a new futures-aware, one-shot channel. @@ -105,7 +101,7 @@ impl Future for Receiver { #[cfg(test)] mod tests { use super::*; - use futures::future::lazy; + use crate::util::lazy; #[crate::rt_test] async fn test_oneshot() { diff --git a/ntex/src/channel/pool.rs b/ntex/src/channel/pool.rs index a96502b5..28750b5e 100644 --- a/ntex/src/channel/pool.rs +++ b/ntex/src/channel/pool.rs @@ -1,12 +1,9 @@ //! A one-shot pool, futures-aware channel. -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{future::Future, pin::Pin, task::Context, task::Poll}; -pub use futures::channel::oneshot::Canceled; use slab::Slab; -use super::cell::Cell; +use super::{cell::Cell, Canceled}; use crate::task::LocalWaker; /// Creates a new futures-aware, pool of one-shot's. @@ -179,7 +176,7 @@ impl Future for Receiver { #[cfg(test)] mod tests { use super::*; - use futures::future::lazy; + use crate::util::lazy; #[crate::rt_test] async fn test_pool() { diff --git a/ntex/src/connect/message.rs b/ntex/src/connect/message.rs index 2d36a738..f0059f95 100644 --- a/ntex/src/connect/message.rs +++ b/ntex/src/connect/message.rs @@ -3,7 +3,7 @@ use std::fmt; use std::iter::{FromIterator, FusedIterator}; use std::net::SocketAddr; -use either::Either; +use crate::util::Either; /// Connect request pub trait Address: Unpin + 'static { diff --git a/ntex/src/connect/resolve.rs b/ntex/src/connect/resolve.rs index 197751de..d545f32e 100644 --- a/ntex/src/connect/resolve.rs +++ b/ntex/src/connect/resolve.rs @@ -3,11 +3,9 @@ use std::{ task::Context, task::Poll, }; -use futures::future::Either; - use super::{default_resolver, Address, Connect, ConnectError, DnsResolver}; use crate::service::{Service, ServiceFactory}; -use crate::util::Ready; +use crate::util::{Either, Ready}; /// DNS Resolver Service pub struct Resolver { @@ -42,7 +40,7 @@ impl Resolver { if req.addr.is_some() || req.req.addr().is_some() { Either::Right(Ready::ok(req)) } else if let Ok(ip) = req.host().parse() { - req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port()))); + req.addr = Some(Either::Left(SocketAddr::new(ip, req.port()))); Either::Right(Ready::ok(req)) } else { trace!("DNS resolver: resolving host {:?}", req.host()); @@ -138,9 +136,8 @@ impl Service for Resolver { #[cfg(test)] mod tests { - use futures::future::lazy; - use super::*; + use crate::util::lazy; #[crate::rt_test] async fn resolver() { diff --git a/ntex/src/framed/dispatcher.rs b/ntex/src/framed/dispatcher.rs index a957246d..d0442008 100644 --- a/ntex/src/framed/dispatcher.rs +++ b/ntex/src/framed/dispatcher.rs @@ -5,11 +5,10 @@ use std::{ time::Instant, }; -use either::Either; - use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; use crate::framed::{DispatchItem, Read, ReadTask, State, Timer, Write, WriteTask}; use crate::service::{IntoService, Service}; +use crate::util::Either; type Response = ::Item; diff --git a/ntex/src/framed/state.rs b/ntex/src/framed/state.rs index d845726d..561032a7 100644 --- a/ntex/src/framed/state.rs +++ b/ntex/src/framed/state.rs @@ -2,12 +2,11 @@ use std::task::{Context, Poll, Waker}; use std::{cell::Cell, cell::RefCell, future::Future, hash, io, mem, pin::Pin, rc::Rc}; -use futures::future::poll_fn; use slab::Slab; use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts}; use crate::task::LocalWaker; -use crate::util::{Buf, BytesMut, Either}; +use crate::util::{poll_fn, Buf, BytesMut, Either}; bitflags::bitflags! { pub struct Flags: u16 { @@ -1110,12 +1109,9 @@ impl Drop for OnDisconnect { #[cfg(test)] mod tests { use bytes::Bytes; - use futures::future::lazy; - - use crate::codec::BytesCodec; - use crate::testing::Io; use super::*; + use crate::{codec::BytesCodec, testing::Io, util::lazy}; const BIN: &[u8] = b"GET /test HTTP/1\r\n\r\n"; const TEXT: &str = "GET /test HTTP/1\r\n\r\n"; diff --git a/ntex/src/framed/write.rs b/ntex/src/framed/write.rs index 13ab8a17..c5e7e407 100644 --- a/ntex/src/framed/write.rs +++ b/ntex/src/framed/write.rs @@ -165,7 +165,9 @@ where // disconnect timeout if let Some(ref mut delay) = delay { - futures::ready!(Pin::new(delay).poll(cx)); + if let Poll::Pending = Pin::new(delay).poll(cx) { + return Poll::Pending; + } } this.state.set_wr_shutdown_complete(); log::trace!("write task is stopped after delay"); diff --git a/ntex/src/http/body.rs b/ntex/src/http/body.rs index cac64d28..b83b67c6 100644 --- a/ntex/src/http/body.rs +++ b/ntex/src/http/body.rs @@ -3,7 +3,7 @@ use std::{ }; use bytes::{Bytes, BytesMut}; -use futures::Stream; +use futures_core::Stream; #[derive(Debug, PartialEq, Copy, Clone)] /// Body size hint @@ -509,11 +509,11 @@ where #[cfg(test)] mod tests { - use futures::{future::poll_fn, stream}; + use futures::stream; use std::io; use super::*; - use crate::util::Ready; + use crate::util::{poll_fn, Ready}; impl Body { pub(crate) fn get_ref(&self) -> &[u8] { diff --git a/ntex/src/http/client/connection.rs b/ntex/src/http/client/connection.rs index 512fc42b..8692e144 100644 --- a/ntex/src/http/client/connection.rs +++ b/ntex/src/http/client/connection.rs @@ -1,7 +1,6 @@ use std::{fmt, future::Future, pin::Pin, time}; use bytes::Bytes; -use futures::future::Either; use h2::client::SendRequest; use crate::codec::{AsyncRead, AsyncWrite, Framed}; @@ -10,7 +9,7 @@ use crate::http::h1::ClientCodec; use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::payload::Payload; use crate::http::Protocol; -use crate::util::Ready; +use crate::util::{Either, Ready}; use super::error::SendRequestError; use super::pool::Acquired; diff --git a/ntex/src/http/client/connector.rs b/ntex/src/http/client/connector.rs index d874ae70..be7f2d20 100644 --- a/ntex/src/http/client/connector.rs +++ b/ntex/src/http/client/connector.rs @@ -1,13 +1,11 @@ use std::{rc::Rc, task::Context, task::Poll, time::Duration}; -use futures::future::Either; - use crate::codec::{AsyncRead, AsyncWrite}; use crate::connect::{self, Connect as TcpConnect, Connector as TcpConnector}; use crate::http::{Protocol, Uri}; use crate::service::{apply_fn, boxed, Service}; use crate::util::timeout::{TimeoutError, TimeoutService}; -use crate::util::Ready; +use crate::util::{Either, Ready}; use super::connection::Connection; use super::error::ConnectError; @@ -83,7 +81,7 @@ impl Connector { let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap(); let _ = ssl .set_alpn_protos(b"\x02h2\x08http/1.1") - .map_err(|e| error!("Can not set ALPN protocol: {:?}", e)); + .map_err(|e| error!("Cannot set ALPN protocol: {:?}", e)); conn.openssl(ssl.build()) } #[cfg(all(not(feature = "openssl"), feature = "rustls"))] @@ -366,7 +364,7 @@ where #[cfg(test)] mod tests { use super::*; - use futures::future::lazy; + use crate::util::lazy; #[crate::rt_test] async fn test_readiness() { diff --git a/ntex/src/http/client/error.rs b/ntex/src/http/client/error.rs index 0cad13c6..fa43d141 100644 --- a/ntex/src/http/client/error.rs +++ b/ntex/src/http/client/error.rs @@ -2,7 +2,6 @@ use std::{error::Error, io}; use derive_more::{Display, From}; -use either::Either; use serde_json::error::Error as JsonError; #[cfg(feature = "openssl")] @@ -12,6 +11,7 @@ use crate::connect::ResolveError; use crate::http::error::{HttpError, ParseError, PayloadError}; use crate::http::header::HeaderValue; use crate::http::StatusCode; +use crate::util::Either; use crate::ws::ProtocolError; /// Websocket client error diff --git a/ntex/src/http/client/frozen.rs b/ntex/src/http/client/frozen.rs index 7e43db9f..a354e652 100644 --- a/ntex/src/http/client/frozen.rs +++ b/ntex/src/http/client/frozen.rs @@ -1,7 +1,7 @@ use std::{convert::TryFrom, error::Error, fmt, net, rc::Rc, time::Duration}; use bytes::Bytes; -use futures::Stream; +use futures_core::Stream; use serde::Serialize; use crate::http::body::Body; diff --git a/ntex/src/http/client/h1proto.rs b/ntex/src/http/client/h1proto.rs index 7b44a501..bdb5772c 100644 --- a/ntex/src/http/client/h1proto.rs +++ b/ntex/src/http/client/h1proto.rs @@ -1,7 +1,6 @@ use std::{io, io::Write, pin::Pin, task::Context, task::Poll, time}; use bytes::{BufMut, Bytes, BytesMut}; -use futures::{future::poll_fn, SinkExt, Stream, StreamExt}; use crate::codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; use crate::http::body::{BodySize, MessageBody}; @@ -10,6 +9,8 @@ use crate::http::h1; use crate::http::header::{HeaderMap, HeaderValue, HOST}; use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::payload::{Payload, PayloadStream}; +use crate::util::{next, poll_fn, send}; +use crate::{Sink, Stream}; use super::connection::{ConnectionLifetime, ConnectionType, IoConnection}; use super::error::{ConnectError, SendRequestError}; @@ -48,7 +49,7 @@ where headers.insert(HOST, value) } }, - Err(e) => log::error!("Can not set HOST header {}", e), + Err(e) => log::error!("Cannot set HOST header {}", e), } } } @@ -61,7 +62,7 @@ where // create Framed and send request let mut framed = Framed::new(io, h1::ClientCodec::default()); - framed.send((head, body.size()).into()).await?; + send(&mut framed, (head, body.size()).into()).await?; // send request body match body.size() { @@ -70,10 +71,8 @@ where }; // read response and init read body - let res = framed.into_future().await; - let (head, framed) = if let (Some(result), framed) = res { - let item = result.map_err(SendRequestError::from)?; - (item, framed) + let head = if let Some(result) = next(&mut framed).await { + result.map_err(SendRequestError::from)? } else { return Err(SendRequestError::from(ConnectError::Disconnected)); }; @@ -85,7 +84,7 @@ where Ok((head, Payload::None)) } _ => { - let pl: PayloadStream = PlStream::new(framed).boxed_local(); + let pl: PayloadStream = Box::pin(PlStream::new(framed)); Ok((head, pl.into())) } } @@ -100,10 +99,10 @@ where { // create Framed and send request let mut framed = Framed::new(io, h1::ClientCodec::default()); - framed.send((head, BodySize::None).into()).await?; + send(&mut framed, (head, BodySize::None).into()).await?; // read response - if let (Some(result), framed) = framed.into_future().await { + if let Some(result) = next(&mut framed).await { let head = result.map_err(SendRequestError::from)?; Ok((head, framed)) } else { @@ -150,7 +149,8 @@ where } } - SinkExt::flush(framed).await?; + poll_fn(|cx| Pin::new(&mut *framed).poll_flush(cx)).await?; + Ok(()) } diff --git a/ntex/src/http/client/h2proto.rs b/ntex/src/http/client/h2proto.rs index 56c1f802..389e8fef 100644 --- a/ntex/src/http/client/h2proto.rs +++ b/ntex/src/http/client/h2proto.rs @@ -1,7 +1,6 @@ use std::{convert::TryFrom, time}; use bytes::Bytes; -use futures::future::poll_fn; use h2::{client::SendRequest, SendStream}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::{request::Request, Method, Version}; @@ -11,6 +10,7 @@ use crate::http::body::{BodySize, MessageBody}; use crate::http::header::HeaderMap; use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::payload::Payload; +use crate::util::poll_fn; use super::connection::{ConnectionType, IoConnection}; use super::error::SendRequestError; diff --git a/ntex/src/http/client/pool.rs b/ntex/src/http/client/pool.rs index 08e4853c..de871ce0 100644 --- a/ntex/src/http/client/pool.rs +++ b/ntex/src/http/client/pool.rs @@ -2,7 +2,6 @@ use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use std::{cell::RefCell, collections::VecDeque, future::Future, pin::Pin, rc::Rc}; -use futures::future::poll_fn; use h2::client::{handshake, Connection, SendRequest}; use http::uri::Authority; @@ -12,7 +11,7 @@ use crate::http::Protocol; use crate::rt::{spawn, time::sleep, time::Sleep}; use crate::service::Service; use crate::task::LocalWaker; -use crate::util::{Bytes, HashMap}; +use crate::util::{poll_fn, Bytes, HashMap}; use super::connection::{ConnectionType, IoConnection}; use super::error::ConnectError; @@ -609,18 +608,14 @@ impl Drop for Acquired { #[cfg(test)] mod tests { - use futures::future::lazy; - use std::cell::RefCell; - use std::convert::TryFrom; - use std::rc::Rc; - use std::time::Duration; + use std::{cell::RefCell, convert::TryFrom, rc::Rc, time::Duration}; use super::*; - use crate::http::client::Connection; - use crate::http::Uri; use crate::rt::time::sleep; - use crate::service::fn_service; - use crate::testing::Io; + use crate::{ + http::client::Connection, http::Uri, service::fn_service, testing::Io, + util::lazy, + }; #[crate::rt_test] async fn test_basics() { diff --git a/ntex/src/http/client/request.rs b/ntex/src/http/client/request.rs index 52ec9de6..aef67c24 100644 --- a/ntex/src/http/client/request.rs +++ b/ntex/src/http/client/request.rs @@ -1,7 +1,7 @@ use std::{convert::TryFrom, error::Error, fmt, net, rc::Rc, time::Duration}; use bytes::Bytes; -use futures::Stream; +use futures_core::Stream; use serde::Serialize; #[cfg(feature = "cookie")] diff --git a/ntex/src/http/client/response.rs b/ntex/src/http/client/response.rs index a372e586..7d8ab89b 100644 --- a/ntex/src/http/client/response.rs +++ b/ntex/src/http/client/response.rs @@ -3,7 +3,7 @@ use std::task::{Context, Poll}; use std::{fmt, future::Future, marker::PhantomData, mem, pin::Pin}; use bytes::{Bytes, BytesMut}; -use futures::Stream; +use futures_core::Stream; use serde::de::DeserializeOwned; #[cfg(feature = "cookie")] diff --git a/ntex/src/http/client/sender.rs b/ntex/src/http/client/sender.rs index 0f3fb470..08088ed8 100644 --- a/ntex/src/http/client/sender.rs +++ b/ntex/src/http/client/sender.rs @@ -3,7 +3,7 @@ use std::{convert::TryFrom, error::Error, future::Future, net, pin::Pin, time}; use bytes::Bytes; use derive_more::From; -use futures::Stream; +use futures_core::Stream; use serde::Serialize; use crate::http::body::{Body, BodyStream}; @@ -82,7 +82,10 @@ impl Future for SendClientRequest { } } - let res = futures::ready!(Pin::new(send).poll(cx)); + let res = match Pin::new(send).poll(cx) { + Poll::Ready(res) => res, + Poll::Pending => return Poll::Pending, + }; #[cfg(feature = "compress")] let res = res.map(|mut res| { diff --git a/ntex/src/http/client/test.rs b/ntex/src/http/client/test.rs index 3f7f501c..56bb4560 100644 --- a/ntex/src/http/client/test.rs +++ b/ntex/src/http/client/test.rs @@ -61,7 +61,7 @@ impl TestResponse { return self; } } - panic!("Can not create header"); + panic!("Cannot create header"); } #[cfg(feature = "cookie")] diff --git a/ntex/src/http/client/ws.rs b/ntex/src/http/client/ws.rs index 6083becc..3f015a22 100644 --- a/ntex/src/http/client/ws.rs +++ b/ntex/src/http/client/ws.rs @@ -3,15 +3,15 @@ use std::{convert::TryFrom, fmt, net::SocketAddr, rc::Rc, str}; #[cfg(feature = "cookie")] use coo_kie::{Cookie, CookieJar}; -use futures::future::{Either, TryFutureExt}; +use nanorand::{WyRand, RNG}; use crate::codec::{AsyncRead, AsyncWrite, Framed}; use crate::framed::{DispatchItem, Dispatcher, State}; +use crate::http::error::HttpError; use crate::http::header::{self, HeaderName, HeaderValue, AUTHORIZATION}; -use crate::http::{ - error::HttpError, ConnectionType, Payload, RequestHead, StatusCode, Uri, -}; +use crate::http::{ConnectionType, Payload, RequestHead, StatusCode, Uri}; use crate::service::{apply_fn, into_service, IntoService, Service}; +use crate::util::Either; use crate::{channel::mpsc, rt, rt::time::timeout, util::sink, util::Ready, ws}; pub use crate::ws::{CloseCode, CloseReason, Frame, Message}; @@ -295,7 +295,8 @@ impl WsRequest { // Generate a random key for the `Sec-WebSocket-Key` header. // a base64-encoded (see Section 4 of [RFC4648]) value that, // when decoded, is 16 bytes in length (RFC 6455) - let sec_key: [u8; 16] = rand::random(); + let mut sec_key: [u8; 16] = [0; 16]; + WyRand::new().fill(&mut sec_key); let key = base64::encode(&sec_key); self.head.headers.insert( @@ -444,7 +445,8 @@ where if let Err(err) = self .start(into_service(move |item| { - srv.call(Ok::<_, ws::WsError<()>>(item)).map_err(|_| ()) + let fut = srv.call(Ok::<_, ws::WsError<()>>(item)); + async move { fut.await.map_err(|_| ()) } })) .await { diff --git a/ntex/src/http/config.rs b/ntex/src/http/config.rs index 17da6946..4d434e63 100644 --- a/ntex/src/http/config.rs +++ b/ntex/src/http/config.rs @@ -1,12 +1,10 @@ -use std::{cell::Cell, cell::RefCell, ptr::copy_nonoverlapping, rc::Rc, time::Duration}; - -use bytes::BytesMut; -use time::OffsetDateTime; +use std::{cell::Cell, cell::RefCell, ptr::copy_nonoverlapping, rc::Rc, time}; use crate::framed::Timer; use crate::http::{Request, Response}; use crate::rt::time::{sleep, sleep_until, Instant, Sleep}; use crate::service::boxed::BoxService; +use crate::util::BytesMut; #[derive(Debug, PartialEq, Clone, Copy)] /// Server keep-alive setting @@ -106,7 +104,7 @@ pub(super) struct DispatcherConfig { pub(super) service: S, pub(super) expect: X, pub(super) upgrade: Option, - pub(super) keep_alive: Duration, + pub(super) keep_alive: time::Duration, pub(super) client_timeout: u64, pub(super) client_disconnect: u64, pub(super) ka_enabled: bool, @@ -131,7 +129,7 @@ impl DispatcherConfig { expect, upgrade, on_request, - keep_alive: Duration::from_secs(cfg.0.keep_alive), + keep_alive: time::Duration::from_secs(cfg.0.keep_alive), client_timeout: cfg.0.client_timeout, client_disconnect: cfg.0.client_disconnect, ka_enabled: cfg.0.ka_enabled, @@ -207,7 +205,7 @@ impl DateServiceInner { self.current_time.set(Instant::now()); let mut bytes = DATE_VALUE_DEFAULT; - let dt = OffsetDateTime::now_utc().format("%a, %d %b %Y %H:%M:%S GMT"); + let dt = httpdate::HttpDate::from(time::SystemTime::now()).to_string(); bytes[6..35].copy_from_slice(dt.as_ref()); self.current_date.set(bytes); } @@ -225,7 +223,7 @@ impl DateService { // periodic date update let s = self.clone(); crate::rt::spawn(async move { - sleep(Duration::from_millis(500)).await; + sleep(time::Duration::from_millis(500)).await; s.0.current.set(false); }); } diff --git a/ntex/src/http/encoding/decoder.rs b/ntex/src/http/encoding/decoder.rs index f31c8e23..32329b86 100644 --- a/ntex/src/http/encoding/decoder.rs +++ b/ntex/src/http/encoding/decoder.rs @@ -1,12 +1,9 @@ -use std::future::Future; -use std::io::{self, Write}; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{future::Future, io, io::Write, pin::Pin, task::Context, task::Poll}; use brotli2::write::BrotliDecoder; use bytes::Bytes; use flate2::write::{GzDecoder, ZlibDecoder}; -use futures::{ready, Stream}; +use futures_core::Stream; use super::Writer; use crate::http::error::PayloadError; @@ -79,10 +76,11 @@ where ) -> Poll> { loop { if let Some(ref mut fut) = self.fut { - let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) { - Ok(Ok(item)) => item, - Ok(Err(e)) => return Poll::Ready(Some(Err(e.into()))), - Err(e) => return Poll::Ready(Some(Err(e.into()))), + let (chunk, decoder) = match Pin::new(fut).poll(cx) { + Poll::Ready(Ok(Ok(item))) => item, + Poll::Ready(Ok(Err(e))) => return Poll::Ready(Some(Err(e.into()))), + Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + Poll::Pending => return Poll::Pending, }; self.decoder = Some(decoder); self.fut.take(); diff --git a/ntex/src/http/encoding/encoder.rs b/ntex/src/http/encoding/encoder.rs index 0753b6f3..faace54c 100644 --- a/ntex/src/http/encoding/encoder.rs +++ b/ntex/src/http/encoding/encoder.rs @@ -1,14 +1,9 @@ //! Stream encoder -use std::error::Error; -use std::future::Future; -use std::io::{self, Write}; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{future::Future, io, io::Write, pin::Pin, task::Context, task::Poll}; use brotli2::write::BrotliEncoder; use bytes::Bytes; use flate2::write::{GzEncoder, ZlibEncoder}; -use futures::ready; use crate::http::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::http::header::{ContentEncoding, HeaderValue, CONTENT_ENCODING}; @@ -88,22 +83,25 @@ impl MessageBody for Encoder { fn poll_next_chunk( &mut self, cx: &mut Context<'_>, - ) -> Poll>>> { + ) -> Poll>>> { loop { if self.eof { return Poll::Ready(None); } if let Some(ref mut fut) = self.fut { - let mut encoder = match ready!(Pin::new(fut).poll(cx)) { - Ok(Ok(item)) => item, - Ok(Err(e)) => return Poll::Ready(Some(Err(Box::new(e)))), - Err(_) => { + let mut encoder = match Pin::new(fut).poll(cx) { + Poll::Ready(Ok(Ok(item))) => item, + Poll::Ready(Ok(Err(e))) => { + return Poll::Ready(Some(Err(Box::new(e)))) + } + Poll::Ready(Err(_)) => { return Poll::Ready(Some(Err(Box::new(io::Error::new( io::ErrorKind::Other, "Canceled", ))))); } + Poll::Pending => return Poll::Pending, }; let chunk = encoder.take(); self.encoder = Some(encoder); diff --git a/ntex/src/http/error.rs b/ntex/src/http/error.rs index cdc7ce09..1c3677a9 100644 --- a/ntex/src/http/error.rs +++ b/ntex/src/http/error.rs @@ -1,16 +1,16 @@ //! Http related errors use std::{fmt, io, io::Write, str::Utf8Error, string::FromUtf8Error}; -use either::Either; use http::{header, uri::InvalidUri, StatusCode}; // re-export for convinience -pub use futures::channel::oneshot::Canceled; +pub use crate::channel::Canceled; pub use http::Error as HttpError; use crate::http::body::Body; use crate::http::response::Response; use crate::rt::task::JoinError; +use crate::util::Either; /// Error that can be converted to `Response` pub trait ResponseError: fmt::Display + fmt::Debug { @@ -128,7 +128,7 @@ pub enum PayloadError { )] Incomplete(Option), /// Content encoding stream corruption - #[display(fmt = "Can not decode content-encoding.")] + #[display(fmt = "Cannot decode content-encoding.")] EncodingCorrupted, /// A payload reached size limit. #[display(fmt = "A payload reached size limit.")] @@ -219,8 +219,8 @@ impl std::error::Error for DispatchError {} /// A set of error that can occure during parsing content type #[derive(PartialEq, Debug, Display)] pub enum ContentTypeError { - /// Can not parse content type - #[display(fmt = "Can not parse content type")] + /// Cannot parse content type + #[display(fmt = "Cannot parse content type")] ParseError, /// Unknown content encoding #[display(fmt = "Unknown content encoding")] diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index d8a1bbbd..ddd359ed 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -734,18 +734,14 @@ mod tests { use std::{cell::Cell, io, sync::Arc}; use bytes::{Bytes, BytesMut}; - use futures::future::{err, lazy, ok, FutureExt}; - use futures::StreamExt; use rand::Rng; use super::*; - use crate::codec::Decoder; use crate::http::config::{DispatcherConfig, ServiceConfig}; use crate::http::h1::{ClientCodec, ExpectHandler, UpgradeHandler}; use crate::http::{body, Request, ResponseHead, StatusCode}; - use crate::rt::time::sleep; use crate::service::{boxed, fn_service, IntoService}; - use crate::testing::Io; + use crate::{codec::Decoder, rt::time::sleep, testing::Io, util::lazy, util::next}; const BUFFER_SIZE: usize = 32_768; @@ -815,12 +811,14 @@ mod tests { server, Rc::new(DispatcherConfig::new( ServiceConfig::default(), - fn_service(|_| ok::<_, io::Error>(Response::Ok().finish())), + fn_service(|_| { + Box::pin(async { Ok::<_, io::Error>(Response::Ok().finish()) }) + }), ExpectHandler, None, Some(boxed::service(crate::into_service(move |(req, _)| { data2.set(true); - ok(req) + Box::pin(async move { Ok(req) }) }))), )), None, @@ -842,7 +840,9 @@ mod tests { client.remote_buffer_cap(1024); client.write("GET /test HTTP/1\r\n\r\n"); - let mut h1 = h1(server, |_| ok::<_, io::Error>(Response::Ok().finish())); + let mut h1 = h1(server, |_| { + Box::pin(async { Ok::<_, io::Error>(Response::Ok().finish()) }) + }); sleep(time::Duration::from_millis(50)).await; assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready()); @@ -863,7 +863,9 @@ mod tests { let (client, server) = Io::create(); client.remote_buffer_cap(4096); let mut decoder = ClientCodec::default(); - spawn_h1(server, |_| ok::<_, io::Error>(Response::Ok().finish())); + spawn_h1(server, |_| async { + Ok::<_, io::Error>(Response::Ok().finish()) + }); client.write("GET /test HTTP/1.1\r\n\r\n"); @@ -891,7 +893,7 @@ mod tests { let mut decoder = ClientCodec::default(); spawn_h1(server, |mut req: Request| async move { let mut p = req.take_payload(); - while let Some(_) = p.next().await {} + while let Some(_) = next(&mut p).await {} Ok::<_, io::Error>(Response::Ok().finish()) }); @@ -963,7 +965,7 @@ mod tests { let (client, server) = Io::create(); spawn_h1(server, move |_| { num2.fetch_add(1, Ordering::Relaxed); - ok::<_, io::Error>(Response::Ok().finish()) + async { Ok::<_, io::Error>(Response::Ok().finish()) } }); client.remote_buffer_cap(1024); @@ -983,7 +985,9 @@ mod tests { let (client, server) = Io::create(); client.remote_buffer_cap(4096); - let mut h1 = h1(server, |_| ok::<_, io::Error>(Response::Ok().finish())); + let mut h1 = h1(server, |_| { + Box::pin(async { Ok::<_, io::Error>(Response::Ok().finish()) }) + }); h1.inner.state.set_buffer_params(16 * 1024, 16 * 1024, 1024); let mut decoder = ClientCodec::default(); @@ -1018,7 +1022,7 @@ mod tests { async move { // read one chunk let mut pl = req.take_payload(); - let _ = pl.next().await.unwrap().unwrap(); + let _ = next(&mut pl).await.unwrap().unwrap(); m.store(true, Ordering::Relaxed); // sleep sleep(time::Duration::from_secs(999_999)).await; @@ -1071,8 +1075,9 @@ mod tests { let (client, server) = Io::create(); let mut h1 = h1(server, move |_| { let n = num2.clone(); - async move { Ok::<_, io::Error>(Response::Ok().message_body(Stream(n.clone()))) } - .boxed_local() + Box::pin(async move { + Ok::<_, io::Error>(Response::Ok().message_body(Stream(n.clone()))) + }) }); let state = h1.inner.state.clone(); @@ -1128,7 +1133,9 @@ mod tests { let (client, server) = Io::create(); client.remote_buffer_cap(4096); let mut h1 = h1(server, |_| { - ok::<_, io::Error>(Response::Ok().message_body(Stream(false))) + Box::pin(async { + Ok::<_, io::Error>(Response::Ok().message_body(Stream(false))) + }) }); client.write("GET /test HTTP/1.1\r\n\r\n"); @@ -1155,7 +1162,9 @@ mod tests { client.write("GET /test HTTP/1.1\r\ncontent-length:512\r\n\r\n"); let mut h1 = h1(server, |_| { - err::, _>(io::Error::new(io::ErrorKind::Other, "error")) + Box::pin(async { + Err::, _>(io::Error::new(io::ErrorKind::Other, "error")) + }) }); sleep(time::Duration::from_millis(50)).await; diff --git a/ntex/src/http/h1/payload.rs b/ntex/src/http/h1/payload.rs index fe54fae1..2f7b7498 100644 --- a/ntex/src/http/h1/payload.rs +++ b/ntex/src/http/h1/payload.rs @@ -1,12 +1,10 @@ //! Payload stream -use std::cell::RefCell; -use std::collections::VecDeque; -use std::pin::Pin; use std::rc::{Rc, Weak}; use std::task::{Context, Poll}; +use std::{cell::RefCell, collections::VecDeque, pin::Pin}; use bytes::Bytes; -use futures::Stream; +use futures_core::Stream; use crate::http::error::PayloadError; use crate::task::LocalWaker; @@ -210,7 +208,7 @@ impl Inner { #[cfg(test)] mod tests { use super::*; - use futures::future::poll_fn; + use crate::util::poll_fn; #[crate::rt_test] async fn test_unread_data() { diff --git a/ntex/src/http/h1/upgrade.rs b/ntex/src/http/h1/upgrade.rs index 9c4984b0..41080e13 100644 --- a/ntex/src/http/h1/upgrade.rs +++ b/ntex/src/http/h1/upgrade.rs @@ -1,11 +1,8 @@ use std::{io, marker::PhantomData, task::Context, task::Poll}; -use futures::future::Ready; - -use crate::framed::State; use crate::http::h1::Codec; use crate::http::request::Request; -use crate::{Service, ServiceFactory}; +use crate::{framed::State, util::Ready, Service, ServiceFactory}; pub struct UpgradeHandler(PhantomData); @@ -16,7 +13,7 @@ impl ServiceFactory for UpgradeHandler { type Error = io::Error; type Service = UpgradeHandler; type InitError = io::Error; - type Future = Ready>; + type Future = Ready; #[inline] fn new_service(&self, _: ()) -> Self::Future { @@ -28,7 +25,7 @@ impl Service for UpgradeHandler { type Request = (Request, T, State, Codec); type Response = (); type Error = io::Error; - type Future = Ready>; + type Future = Ready; #[inline] fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { diff --git a/ntex/src/http/h2/mod.rs b/ntex/src/http/h2/mod.rs index 02e02197..15b0b720 100644 --- a/ntex/src/http/h2/mod.rs +++ b/ntex/src/http/h2/mod.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; -use futures::Stream; +use futures_core::Stream; use h2::RecvStream; mod dispatcher; diff --git a/ntex/src/http/payload.rs b/ntex/src/http/payload.rs index 0441aac4..4b6fc563 100644 --- a/ntex/src/http/payload.rs +++ b/ntex/src/http/payload.rs @@ -1,7 +1,7 @@ use std::{fmt, mem, pin::Pin, task::Context, task::Poll}; use bytes::Bytes; -use futures::Stream; +use futures_core::Stream; use h2::RecvStream; use super::error::PayloadError; diff --git a/ntex/src/http/response.rs b/ntex/src/http/response.rs index 7e4f4891..b04cd8e1 100644 --- a/ntex/src/http/response.rs +++ b/ntex/src/http/response.rs @@ -2,7 +2,6 @@ use std::{cell::Ref, cell::RefMut, convert::TryFrom, error::Error, fmt, str}; use bytes::{Bytes, BytesMut}; -use futures::Stream; use serde::Serialize; #[cfg(feature = "cookie")] @@ -13,7 +12,7 @@ use crate::http::error::{HttpError, ResponseError}; use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue}; use crate::http::message::{ConnectionType, Message, ResponseHead}; use crate::http::StatusCode; -use crate::util::Extensions; +use crate::{util::Extensions, Stream}; /// An HTTP Response pub struct Response { diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index 7caefd03..371da34a 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -112,7 +112,7 @@ impl TestRequest { return self; } } - panic!("Can not create header"); + panic!("Cannot create header"); } #[cfg(feature = "cookie")] @@ -242,7 +242,7 @@ pub fn server>(factory: F) -> TestServer { builder.set_verify(SslVerifyMode::NONE); let _ = builder .set_alpn_protos(b"\x02h2\x08http/1.1") - .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e)); + .map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e)); Connector::default() .timeout(time::Duration::from_millis(30000)) .openssl(builder.build()) diff --git a/ntex/src/lib.rs b/ntex/src/lib.rs index b5e78bea..826d7c50 100644 --- a/ntex/src/lib.rs +++ b/ntex/src/lib.rs @@ -46,6 +46,9 @@ pub mod ws; pub use self::service::*; +pub use futures_core::stream::Stream; +pub use futures_sink::Sink; + pub mod codec { //! Utilities for encoding and decoding frames. pub use ntex_codec::*; diff --git a/ntex/src/server/accept.rs b/ntex/src/server/accept.rs index fa1b9c01..602316f4 100644 --- a/ntex/src/server/accept.rs +++ b/ntex/src/server/accept.rs @@ -55,13 +55,13 @@ impl AcceptLoop { pub(super) fn new(srv: Server) -> AcceptLoop { // Create a poll instance let poll = mio::Poll::new() - .map_err(|e| panic!("Can not create mio::Poll {}", e)) + .map_err(|e| panic!("Cannot create mio::Poll {}", e)) .unwrap(); let (tx, rx) = sync_mpsc::channel(); let waker = Arc::new( mio::Waker::new(poll.registry(), NOTIFY) - .map_err(|e| panic!("Can not create mio::Waker {}", e)) + .map_err(|e| panic!("Cannot create mio::Waker {}", e)) .unwrap(), ); let notify = AcceptNotify::new(waker, tx); @@ -159,7 +159,7 @@ impl Accept { mio::Token(token + DELTA), mio::Interest::READABLE, ) { - panic!("Can not register io: {}", err); + panic!("Cannot register io: {}", err); } entry.insert(ServerSocketInfo { @@ -231,7 +231,7 @@ impl Accept { mio::Token(token + DELTA), mio::Interest::READABLE, ) { - error!("Can not register server socket {}", err); + error!("Cannot register server socket {}", err); } else { info!("Resume accepting connections on {}", info.addr); } @@ -253,7 +253,7 @@ impl Accept { if let Err(err) = self.poll.registry().deregister(&mut info.sock) { - error!("Can not deregister server socket {}", err); + error!("Cannot deregister server socket {}", err); } else { info!("Paused accepting connections on {}", info.addr); } @@ -266,7 +266,7 @@ impl Accept { mio::Token(token + DELTA), mio::Interest::READABLE, ) { - error!("Can not resume socket accept process: {}", err); + error!("Cannot resume socket accept process: {}", err); } else { info!( "Accepting connections on {} has been resumed", @@ -321,7 +321,7 @@ impl Accept { mio::Token(token + DELTA), mio::Interest::READABLE, ) { - error!("Can not resume socket accept process: {}", err); + error!("Cannot resume socket accept process: {}", err); } else { info!("Accepting connections on {} has been resumed", info.addr); } @@ -412,7 +412,7 @@ impl Accept { error!("Error accepting connection: {}", e); if let Err(err) = self.poll.registry().deregister(&mut info.sock) { - error!("Can not deregister server socket {}", err); + error!("Cannot deregister server socket {}", err); } // sleep after error diff --git a/ntex/src/server/builder.rs b/ntex/src/server/builder.rs index c5af705a..6d7489ec 100644 --- a/ntex/src/server/builder.rs +++ b/ntex/src/server/builder.rs @@ -1,16 +1,13 @@ -use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; -use std::{io, mem, net}; +use std::{future::Future, io, mem, net, pin::Pin, time::Duration}; -use futures::channel::mpsc::{unbounded, UnboundedReceiver}; -use futures::channel::oneshot; -use futures::stream::FuturesUnordered; -use futures::{ready, Future, Stream, StreamExt}; use log::{error, info}; use socket2::{Domain, SockAddr, Socket, Type}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; +use tokio::sync::oneshot; use crate::rt::{net::TcpStream, spawn, time::sleep, System}; +use crate::util::join_all; use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::config::{ConfiguredService, ServiceConfig}; @@ -48,7 +45,7 @@ impl Default for ServerBuilder { impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { - let (tx, rx) = unbounded(); + let (tx, rx) = unbounded_channel(); let server = Server::new(tx); ServerBuilder { @@ -368,15 +365,14 @@ impl ServerBuilder { // stop workers if !self.workers.is_empty() && graceful { - let fut = self + let futs: Vec<_> = self .workers .iter() .map(move |worker| worker.1.stop(graceful)) - .collect::>() - .collect::>(); + .collect(); spawn(async move { - let _ = fut.await; + let _ = join_all(futs).await; if let Some(tx) = completion { let _ = tx.send(()); @@ -443,11 +439,10 @@ impl Future for ServerBuilder { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match ready!(Pin::new(&mut self.cmd).poll_next(cx)) { - Some(it) => self.as_mut().get_mut().handle_cmd(it), - None => { - return Poll::Pending; - } + match Pin::new(&mut self.cmd).poll_recv(cx) { + Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), + Poll::Ready(None) => return Poll::Pending, + Poll::Pending => return Poll::Pending, } } } @@ -476,7 +471,7 @@ pub(super) fn bind_addr( } else { Err(io::Error::new( io::ErrorKind::Other, - "Can not bind to address.", + "Cannot bind to address.", )) } } else { @@ -513,7 +508,6 @@ mod tests { #[cfg(unix)] #[crate::rt_test] async fn test_signals() { - use futures::future::ok; use std::sync::mpsc; use std::{net, thread, time}; @@ -525,7 +519,9 @@ mod tests { crate::server::build() .workers(1) .disable_signals() - .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) + .bind("test", addr, move || { + fn_service(|_| async { Ok::<_, ()>(()) }) + }) .unwrap() .start() }); diff --git a/ntex/src/server/config.rs b/ntex/src/server/config.rs index f2f409bf..87477779 100644 --- a/ntex/src/server/config.rs +++ b/ntex/src/server/config.rs @@ -155,7 +155,7 @@ impl InternalServiceFactory for ConfiguredService { res.push((token, serv)); } Err(_) => { - error!("Can not construct service"); + error!("Cannot construct service"); return Err(()); } } @@ -323,7 +323,7 @@ where match fut.await { Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), Err(e) => { - error!("Can not construct service: {:?}", e); + error!("Cannot construct service: {:?}", e); Err(()) } } diff --git a/ntex/src/server/mod.rs b/ntex/src/server/mod.rs index 63e2844b..67c50764 100644 --- a/ntex/src/server/mod.rs +++ b/ntex/src/server/mod.rs @@ -1,14 +1,11 @@ //! General purpose tcp server #![allow(clippy::type_complexity)] -use std::error::Error; -use std::future::Future; -use std::io; -use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; +use std::{future::Future, io, pin::Pin}; -use futures::channel::mpsc::UnboundedSender; -use futures::channel::oneshot; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot; use crate::util::counter::Counter; @@ -74,7 +71,7 @@ thread_local! { /// Ssl error combinded with service error. #[derive(Debug)] pub enum SslError { - Ssl(Box), + Ssl(Box), Service(E), } @@ -111,11 +108,11 @@ impl Server { } fn signal(&self, sig: signals::Signal) { - let _ = self.0.unbounded_send(ServerCommand::Signal(sig)); + let _ = self.0.send(ServerCommand::Signal(sig)); } fn worker_faulted(&self, idx: usize) { - let _ = self.0.unbounded_send(ServerCommand::WorkerFaulted(idx)); + let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); } /// Pause accepting incoming connections @@ -124,7 +121,7 @@ impl Server { /// All opened connection remains active. pub fn pause(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.unbounded_send(ServerCommand::Pause(tx)); + let _ = self.0.send(ServerCommand::Pause(tx)); async move { let _ = rx.await; } @@ -133,7 +130,7 @@ impl Server { /// Resume accepting incoming connections pub fn resume(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.unbounded_send(ServerCommand::Resume(tx)); + let _ = self.0.send(ServerCommand::Resume(tx)); async move { let _ = rx.await; } @@ -144,7 +141,7 @@ impl Server { /// If server starts with `spawn()` method, then spawned thread get terminated. pub fn stop(&self, graceful: bool) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.unbounded_send(ServerCommand::Stop { + let _ = self.0.send(ServerCommand::Stop { graceful, completion: Some(tx), }); @@ -168,7 +165,7 @@ impl Future for Server { if this.1.is_none() { let (tx, rx) = oneshot::channel(); - if this.0.unbounded_send(ServerCommand::Notify(tx)).is_err() { + if this.0.send(ServerCommand::Notify(tx)).is_err() { return Poll::Ready(Ok(())); } this.1 = Some(rx); diff --git a/ntex/src/server/openssl.rs b/ntex/src/server/openssl.rs index 988f8484..645ac0f5 100644 --- a/ntex/src/server/openssl.rs +++ b/ntex/src/server/openssl.rs @@ -155,10 +155,10 @@ impl Future for AcceptorServiceResponse { } let io = this.io.as_mut().unwrap(); - let res = futures::ready!(Pin::new(io).poll_accept(cx)); - match res { - Ok(_) => Poll::Ready(Ok(this.io.take().unwrap())), - Err(e) => Poll::Ready(Err(Box::new(e))), + match Pin::new(io).poll_accept(cx) { + Poll::Ready(Ok(_)) => Poll::Ready(Ok(this.io.take().unwrap())), + Poll::Ready(Err(e)) => Poll::Ready(Err(Box::new(e))), + Poll::Pending => Poll::Pending, } } } diff --git a/ntex/src/server/rustls.rs b/ntex/src/server/rustls.rs index e145cd42..28efcb14 100644 --- a/ntex/src/server/rustls.rs +++ b/ntex/src/server/rustls.rs @@ -144,10 +144,10 @@ impl Future for AcceptorServiceFut { } } - let res = futures::ready!(Pin::new(&mut this.fut).poll(cx)); - match res { - Ok(io) => Poll::Ready(Ok(io)), - Err(e) => Poll::Ready(Err(Box::new(e))), + match Pin::new(&mut this.fut).poll(cx) { + Poll::Ready(Ok(io)) => Poll::Ready(Ok(io)), + Poll::Ready(Err(e)) => Poll::Ready(Err(Box::new(e))), + Poll::Pending => Poll::Pending, } } } diff --git a/ntex/src/server/service.rs b/ntex/src/server/service.rs index 058268ae..21dcce4c 100644 --- a/ntex/src/server/service.rs +++ b/ntex/src/server/service.rs @@ -83,7 +83,7 @@ where match req { ServerMessage::Connect(stream) => { let stream = FromStream::from_stream(stream).map_err(|e| { - error!("Can not convert to an async io stream: {}", e); + error!("Cannot convert to an async io stream: {}", e); }); if let Ok(stream) = stream { diff --git a/ntex/src/server/signals.rs b/ntex/src/server/signals.rs index f439ab6a..b66eb3c4 100644 --- a/ntex/src/server/signals.rs +++ b/ntex/src/server/signals.rs @@ -50,7 +50,7 @@ impl Signals { match unix::signal(*kind) { Ok(stream) => signals.push((*sig, stream)), Err(e) => log::error!( - "Can not initialize stream handler for {:?} err: {}", + "Cannot initialize stream handler for {:?} err: {}", sig, e ), diff --git a/ntex/src/server/worker.rs b/ntex/src/server/worker.rs index fe45bca5..8532736f 100644 --- a/ntex/src/server/worker.rs +++ b/ntex/src/server/worker.rs @@ -1,15 +1,13 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::task::{Context, Poll}; -use std::{pin::Pin, sync::Arc, time}; +use std::{future::Future, pin::Pin, sync::Arc, time}; -use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; -use futures::channel::oneshot; -use futures::future::join_all; -use futures::{Future, Stream as FutStream}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; use crate::rt::time::{sleep_until, Instant, Sleep}; use crate::rt::{spawn, Arbiter}; -use crate::util::counter::Counter; +use crate::util::{counter::Counter, join_all}; use super::accept::{AcceptNotify, Command}; use super::service::{BoxedServerService, InternalServiceFactory, ServerMessage}; @@ -78,9 +76,7 @@ impl WorkerClient { } pub(super) fn send(&self, msg: Connection) -> Result<(), Connection> { - self.tx1 - .unbounded_send(WorkerCommand(msg)) - .map_err(|msg| msg.into_inner().0) + self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0) } pub(super) fn available(&self) -> bool { @@ -89,7 +85,7 @@ impl WorkerClient { pub(super) fn stop(&self, graceful: bool) -> oneshot::Receiver { let (result, rx) = oneshot::channel(); - let _ = self.tx2.unbounded_send(StopCommand { graceful, result }); + let _ = self.tx2.send(StopCommand { graceful, result }); rx } } @@ -165,8 +161,8 @@ impl Worker { availability: WorkerAvailability, shutdown_timeout: time::Duration, ) -> WorkerClient { - let (tx1, rx1) = unbounded(); - let (tx2, rx2) = unbounded(); + let (tx1, rx1) = unbounded_channel(); + let (tx2, rx2) = unbounded_channel(); let avail = availability.clone(); Arbiter::default().exec_fn(move || { @@ -178,7 +174,7 @@ impl Worker { let _ = spawn(wrk); } Err(e) => { - error!("Can not start worker: {:?}", e); + error!("Cannot start worker: {:?}", e); Arbiter::current().stop(); } } @@ -221,8 +217,7 @@ impl Worker { })); } - let res = join_all(fut).await; - let res: Result, _> = res.into_iter().collect(); + let res: Result, _> = join_all(fut).await.into_iter().collect(); match res { Ok(services) => { for item in services { @@ -335,7 +330,7 @@ impl Future for Worker { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // `StopWorker` message handler if let Poll::Ready(Some(StopCommand { graceful, result })) = - Pin::new(&mut self.rx2).poll_next(cx) + Pin::new(&mut self.rx2).poll_recv(cx) { self.availability.set(false); let num = num_connections(); @@ -409,7 +404,7 @@ impl Future for Worker { } Poll::Ready(Err(_)) => { panic!( - "Can not restart {:?} service", + "Cannot restart {:?} service", self.factories[idx].name(token) ); } @@ -475,7 +470,7 @@ impl Future for Worker { } } - match Pin::new(&mut self.rx).poll_next(cx) { + match Pin::new(&mut self.rx).poll_recv(cx) { // handle incoming io stream Poll::Ready(Some(WorkerCommand(msg))) => { let guard = self.conns.get(); @@ -502,14 +497,13 @@ impl Future for Worker { #[cfg(test)] mod tests { - use futures::future::{lazy, ok, Ready}; - use futures::SinkExt; use std::sync::{Arc, Mutex}; use super::*; use crate::rt::net::TcpStream; use crate::server::service::Factory; use crate::service::{Service, ServiceFactory}; + use crate::util::{lazy, Ready}; #[derive(Clone, Copy, Debug)] enum St { @@ -531,12 +525,12 @@ mod tests { type Service = Srv; type Config = (); type InitError = (); - type Future = Ready>; + type Future = Ready; fn new_service(&self, _: ()) -> Self::Future { let mut cnt = self.counter.lock().unwrap(); *cnt += 1; - ok(Srv { + Ready::ok(Srv { st: self.st.clone(), }) } @@ -550,7 +544,7 @@ mod tests { type Request = TcpStream; type Response = (); type Error = (); - type Future = Ready>; + type Future = Ready<(), ()>; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { let st: St = { *self.st.lock().unwrap() }; @@ -572,15 +566,15 @@ mod tests { } fn call(&self, _: TcpStream) -> Self::Future { - ok(()) + Ready::ok(()) } } #[crate::rt_test] #[allow(clippy::mutex_atomic)] async fn basics() { - let (_tx1, rx1) = unbounded(); - let (mut tx2, rx2) = unbounded(); + let (_tx1, rx1) = unbounded_channel(); + let (tx2, rx2) = unbounded_channel(); let (sync_tx, _sync_rx) = std::sync::mpsc::channel(); let poll = mio::Poll::new().unwrap(); let waker = Arc::new(mio::Waker::new(poll.registry(), mio::Token(1)).unwrap()); @@ -650,7 +644,6 @@ mod tests { graceful: true, result: tx, }) - .await .unwrap(); let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; @@ -660,8 +653,8 @@ mod tests { let _ = rx.await; // force shutdown - let (_tx1, rx1) = unbounded(); - let (mut tx2, rx2) = unbounded(); + let (_tx1, rx1) = unbounded_channel(); + let (tx2, rx2) = unbounded_channel(); let avail = WorkerAvailability::new(AcceptNotify::new(waker, sync_tx.clone())); let f = SrvFactory { st: st.clone(), @@ -695,7 +688,6 @@ mod tests { graceful: false, result: tx, }) - .await .unwrap(); assert!(lazy(|cx| Pin::new(&mut worker).poll(cx)).await.is_ready()); diff --git a/ntex/src/testing.rs b/ntex/src/testing.rs index 4c93b35e..694c6702 100644 --- a/ntex/src/testing.rs +++ b/ntex/src/testing.rs @@ -1,15 +1,30 @@ use std::cell::{Cell, RefCell}; -use std::pin::Pin; use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; -use std::{cmp, io, mem, time}; +use std::task::{Context, Poll, Waker}; +use std::{cmp, fmt, io, mem, pin::Pin, time}; use bytes::BytesMut; -use futures::future::poll_fn; -use futures::task::AtomicWaker; use crate::codec::{AsyncRead, AsyncWrite, ReadBuf}; use crate::rt::time::sleep; +use crate::util::poll_fn; + +#[derive(Default)] +struct AtomicWaker(Arc>>>); + +impl AtomicWaker { + fn wake(&self) { + if let Some(waker) = self.0.lock().unwrap().borrow_mut().take() { + waker.wake() + } + } +} + +impl fmt::Debug for AtomicWaker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "AtomicWaker") + } +} /// Async io stream #[derive(Debug)] @@ -196,7 +211,8 @@ impl Io { if closed { Poll::Ready(()) } else { - read.waker.register(cx.waker()); + *read.waker.0.lock().unwrap().borrow_mut() = + Some(cx.waker().clone()); drop(read); drop(guard); Poll::Pending @@ -248,7 +264,7 @@ impl AsyncRead for Io { ) -> Poll> { let guard = self.local.lock().unwrap(); let mut ch = guard.borrow_mut(); - ch.waker.register(cx.waker()); + *ch.waker.0.lock().unwrap().borrow_mut() = Some(cx.waker().clone()); if !ch.buf.is_empty() { let size = std::cmp::min(ch.buf.len(), buf.remaining()); @@ -288,23 +304,31 @@ impl AsyncWrite for Io { ch.waker.wake(); Poll::Ready(Ok(cap)) } else { - self.local + *self + .local .lock() .unwrap() .borrow_mut() .waker - .register(cx.waker()); + .0 + .lock() + .unwrap() + .borrow_mut() = Some(cx.waker().clone()); Poll::Pending } } IoState::Close => Poll::Ready(Ok(0)), IoState::Pending => { - self.local + *self + .local .lock() .unwrap() .borrow_mut() .waker - .register(cx.waker()); + .0 + .lock() + .unwrap() + .borrow_mut() = Some(cx.waker().clone()); Poll::Pending } IoState::Err(e) => Poll::Ready(Err(e)), diff --git a/ntex/src/util/buffer.rs b/ntex/src/util/buffer.rs index cb81296b..b1be521a 100644 --- a/ntex/src/util/buffer.rs +++ b/ntex/src/util/buffer.rs @@ -5,13 +5,10 @@ use std::{ collections::VecDeque, convert::Infallible, future::Future, pin::Pin, rc::Rc, }; -use futures::future::Either; -use futures::ready; - use crate::channel::oneshot; use crate::service::{IntoService, Service, Transform}; use crate::task::LocalWaker; -use crate::util::Ready; +use crate::util::{Either, Ready}; /// Buffer - service factory for service that can buffer incoming request. /// @@ -223,7 +220,10 @@ impl, E> Future for BufferServiceResponse { Poll::Pending => return Poll::Pending, }, StateProject::Srv { fut, inner } => { - let res = ready!(fut.poll(cx)); + let res = match fut.poll(cx) { + Poll::Ready(res) => res, + Poll::Pending => return Poll::Pending, + }; inner.waker.wake(); return Poll::Ready(res); } @@ -238,7 +238,7 @@ mod tests { use super::*; use crate::service::{apply, fn_factory, Service, ServiceFactory}; - use futures::future::lazy; + use crate::util::lazy; #[derive(Clone)] struct TestService(Rc); diff --git a/ntex/src/util/inflight.rs b/ntex/src/util/inflight.rs index 3f2022bf..bf890f9e 100644 --- a/ntex/src/util/inflight.rs +++ b/ntex/src/util/inflight.rs @@ -119,7 +119,7 @@ mod tests { use super::*; use crate::service::{apply, fn_factory, Service, ServiceFactory}; - use futures::future::{lazy, ok}; + use crate::util::lazy; struct SleepService(Duration); @@ -162,7 +162,10 @@ mod tests { async fn test_newtransform() { let wait_time = Duration::from_millis(50); - let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time)))); + let srv = apply( + InFlight::new(1), + fn_factory(|| async { Ok(SleepService(wait_time)) }), + ); let srv = srv.new_service(&()).await.unwrap(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); diff --git a/ntex/src/util/keepalive.rs b/ntex/src/util/keepalive.rs index 65b91c64..e2e83e74 100644 --- a/ntex/src/util/keepalive.rs +++ b/ntex/src/util/keepalive.rs @@ -139,11 +139,10 @@ where #[cfg(test)] mod tests { - use futures::future::lazy; - use super::*; use crate::rt::time::sleep; use crate::service::{Service, ServiceFactory}; + use crate::util::lazy; #[derive(Debug, PartialEq)] struct TestErr; diff --git a/ntex/src/util/mod.rs b/ntex/src/util/mod.rs index 14a5e464..9a2dfefe 100644 --- a/ntex/src/util/mod.rs +++ b/ntex/src/util/mod.rs @@ -1,3 +1,5 @@ +use std::{future::Future, pin::Pin, task::Context, task::Poll}; + pub mod buffer; pub mod counter; mod extensions; @@ -11,11 +13,162 @@ pub mod variant; pub use self::extensions::Extensions; -pub use ntex_service::util::{lazy, Lazy, Ready}; +pub use ntex_service::util::{lazy, Either, Lazy, Ready}; pub use bytes::{Buf, BufMut, Bytes, BytesMut}; pub use bytestring::ByteString; -pub use either::Either; pub type HashMap = std::collections::HashMap; pub type HashSet = std::collections::HashSet; + +/// Creates a new future wrapping around a function returning [`Poll`]. +/// +/// Polling the returned future delegates to the wrapped function. +pub fn poll_fn(f: F) -> impl Future +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + PollFn { f } +} + +/// Future for the [`poll_fn`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +struct PollFn { + f: F, +} + +impl Unpin for PollFn {} + +impl Future for PollFn +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + (&mut self.f)(cx) + } +} + +/// Creates a future that resolves to the next item in the stream. +pub async fn next(stream: &mut S) -> Option +where + S: crate::Stream + Unpin, +{ + poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await +} + +/// A future that completes after the given item has been fully processed +/// into the sink, including flushing. +pub async fn send(sink: &mut S, item: I) -> Result<(), S::Error> +where + S: crate::Sink + Unpin, +{ + poll_fn(|cx| Pin::new(&mut *sink).poll_ready(cx)).await?; + Pin::new(&mut *sink).start_send(item)?; + poll_fn(|cx| Pin::new(&mut *sink).poll_flush(cx)).await +} + +/// Future for the `join` combinator, waiting for two futures to +/// complete. +pub async fn join(fut_a: A, fut_b: B) -> (A::Output, B::Output) +where + A: Future, + B: Future, +{ + tokio::pin!(fut_a); + tokio::pin!(fut_b); + + let mut res_a = None; + let mut res_b = None; + + poll_fn(|cx| { + if res_a.is_none() { + if let Poll::Ready(item) = Pin::new(&mut fut_a).poll(cx) { + res_a = Some(item) + } + } + if res_b.is_none() { + if let Poll::Ready(item) = Pin::new(&mut fut_b).poll(cx) { + res_b = Some(item) + } + } + if res_a.is_some() && res_b.is_some() { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await; + + (res_a.unwrap(), res_b.unwrap()) +} + +/// Waits for either one of two differently-typed futures to complete. +pub async fn select(fut_a: A, fut_b: B) -> Either +where + A: Future, + B: Future, +{ + tokio::pin!(fut_a); + tokio::pin!(fut_b); + + poll_fn(|cx| { + if let Poll::Ready(item) = Pin::new(&mut fut_a).poll(cx) { + Poll::Ready(Either::Left(item)) + } else if let Poll::Ready(item) = Pin::new(&mut fut_b).poll(cx) { + Poll::Ready(Either::Right(item)) + } else { + Poll::Pending + } + }) + .await +} + +enum MaybeDone +where + T: Future, +{ + Pending(T), + Done(T::Output), +} + +/// Creates a future which represents a collection of the outputs of the futures given. +pub async fn join_all(fut: Vec) -> Vec +where + F: Future, +{ + let mut futs: Vec<_> = fut + .into_iter() + .map(|f| MaybeDone::Pending(Box::pin(f))) + .collect(); + + poll_fn(|cx| { + let mut pending = false; + for item in &mut futs { + if let MaybeDone::Pending(ref mut fut) = item { + if let Poll::Ready(res) = fut.as_mut().poll(cx) { + *item = MaybeDone::Done(res); + } else { + pending = true; + } + } + } + if pending { + Poll::Pending + } else { + Poll::Ready(()) + } + }) + .await; + + futs.into_iter() + .map(|item| { + if let MaybeDone::Done(item) = item { + item + } else { + unreachable!() + } + }) + .collect() +} diff --git a/ntex/src/util/sink.rs b/ntex/src/util/sink.rs index b7a924cd..bb79bb88 100644 --- a/ntex/src/util/sink.rs +++ b/ntex/src/util/sink.rs @@ -2,10 +2,9 @@ use std::{ cell::Cell, cell::RefCell, marker::PhantomData, pin::Pin, task::Context, task::Poll, }; -use futures::future::{ready, Ready}; -use futures::Sink; +use futures_sink::Sink; -use crate::service::Service; +use crate::{service::Service, util::Ready}; /// `SinkService` forwards incoming requests to the provided `Sink` pub struct SinkService { @@ -48,7 +47,7 @@ where type Request = I; type Response = (); type Error = S::Error; - type Future = Ready>; + type Future = Ready<(), S::Error>; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { let mut inner = self.sink.borrow_mut(); @@ -78,6 +77,6 @@ where } fn call(&self, req: I) -> Self::Future { - ready(Pin::new(&mut *self.sink.borrow_mut()).start_send(req)) + Ready::result(Pin::new(&mut *self.sink.borrow_mut()).start_send(req)) } } diff --git a/ntex/src/util/stream.rs b/ntex/src/util/stream.rs index 77ec7f6e..e3344bdc 100644 --- a/ntex/src/util/stream.rs +++ b/ntex/src/util/stream.rs @@ -1,9 +1,8 @@ -use std::{fmt, pin::Pin, task::Context, task::Poll}; - -use futures::{ready, Future, FutureExt, Sink, SinkExt, Stream}; +use std::{fmt, future::Future, pin::Pin, task::Context, task::Poll}; use crate::channel::mpsc; use crate::service::{IntoService, Service}; +use crate::{util::poll_fn, Sink, Stream}; pin_project_lite::pin_project! { pub struct Dispatcher @@ -65,12 +64,17 @@ where if let Some(is_err) = this.shutdown { if let Some(mut sink) = this.sink.take() { crate::rt::spawn(async move { - if sink.flush().await.is_ok() { - let _ = sink.close().await; + if poll_fn(|cx| Pin::new(&mut sink).poll_flush(cx)) + .await + .is_ok() + { + let _ = poll_fn(|cx| Pin::new(&mut sink).poll_close(cx)).await; } }); } - ready!(this.service.poll_shutdown(cx, *is_err)); + if let Poll::Pending = this.service.poll_shutdown(cx, *is_err) { + return Poll::Pending; + } return Poll::Ready(()); } @@ -126,9 +130,11 @@ where Poll::Ready(Ok(_)) => match Pin::new(&mut this.stream).poll_next(cx) { Poll::Ready(Some(Ok(item))) => { let tx = this.rx.sender(); - crate::rt::spawn(this.service.call(item).map(move |res| { + let fut = this.service.call(item); + crate::rt::spawn(async move { + let res = fut.await; let _ = tx.send(res); - })); + }); this = self.as_mut().project(); continue; } @@ -157,14 +163,13 @@ where mod tests { use bytes::BytesMut; use bytestring::ByteString; - use futures::{future::ok, StreamExt}; use std::{cell::Cell, rc::Rc, time::Duration}; use super::*; use crate::channel::mpsc; use crate::codec::Encoder; use crate::rt::time::sleep; - use crate::ws; + use crate::{util::next, ws}; #[crate::rt_test] async fn test_basic() { @@ -181,10 +186,12 @@ mod tests { encoder, crate::fn_service(move |_| { counter2.set(counter2.get() + 1); - ok(Some(ws::Message::Text(ByteString::from_static("test")))) + async { Ok(Some(ws::Message::Text(ByteString::from_static("test")))) } }), ); - crate::rt::spawn(disp.map(|_| ())); + crate::rt::spawn(async move { + let _ = disp.await; + }); let mut buf = BytesMut::new(); let codec = ws::Codec::new().client_mode(); @@ -193,12 +200,12 @@ mod tests { .unwrap(); tx.send(Ok::<_, ()>(buf.split().freeze())).unwrap(); - let data = rx.next().await.unwrap().unwrap(); + let data = next(&mut rx).await.unwrap().unwrap(); assert_eq!(data, b"\x81\x04test".as_ref()); drop(tx); sleep(Duration::from_millis(10)).await; - assert!(rx.next().await.is_none()); + assert!(next(&mut rx).await.is_none()); assert_eq!(counter.get(), 1); } diff --git a/ntex/src/util/time.rs b/ntex/src/util/time.rs index bbd3ed3c..e16be404 100644 --- a/ntex/src/util/time.rs +++ b/ntex/src/util/time.rs @@ -157,7 +157,7 @@ impl SystemTimeService { #[cfg(test)] mod tests { use super::*; - use futures::future::lazy; + use crate::util::lazy; use std::time::{Duration, SystemTime}; #[crate::rt_test] diff --git a/ntex/src/util/timeout.rs b/ntex/src/util/timeout.rs index 807250c3..c7ad17e5 100644 --- a/ntex/src/util/timeout.rs +++ b/ntex/src/util/timeout.rs @@ -2,15 +2,11 @@ //! //! If the response does not complete within the specified timeout, the response //! will be aborted. -use std::{ - fmt, future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll, time, -}; - -use futures::future::Either; +use std::{fmt, future::Future, marker, pin::Pin, task::Context, task::Poll, time}; use crate::rt::time::{sleep, Sleep}; use crate::service::{IntoService, Service, Transform}; -use crate::util::Ready; +use crate::util::{Either, Ready}; const ZERO: time::Duration = time::Duration::from_millis(0); @@ -20,7 +16,7 @@ const ZERO: time::Duration = time::Duration::from_millis(0); #[derive(Debug)] pub struct Timeout { timeout: time::Duration, - _t: PhantomData, + _t: marker::PhantomData, } /// Timeout error @@ -74,7 +70,7 @@ impl Timeout { pub fn new(timeout: time::Duration) -> Self { Timeout { timeout, - _t: PhantomData, + _t: marker::PhantomData, } } } @@ -222,12 +218,12 @@ where #[cfg(test)] mod tests { use derive_more::Display; - use futures::future::{lazy, ok}; use std::task::{Context, Poll}; use std::time::Duration; use super::*; use crate::service::{apply, fn_factory, Service, ServiceFactory}; + use crate::util::lazy; #[derive(Clone, Debug, PartialEq)] struct SleepService(Duration); @@ -303,7 +299,7 @@ mod tests { let timeout = apply( Timeout::new(resolution).clone(), - fn_factory(|| ok::<_, ()>(SleepService(wait_time))), + fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }), ); let srv = timeout.new_service(&()).await.unwrap(); diff --git a/ntex/src/util/variant.rs b/ntex/src/util/variant.rs index fb5b784b..89371576 100644 --- a/ntex/src/util/variant.rs +++ b/ntex/src/util/variant.rs @@ -319,11 +319,11 @@ variant_impl_and!(VariantFactory7, VariantFactory8, V8, v8, (V2, V3, V4, V5, V6, #[cfg(test)] mod tests { - use futures::future::{lazy, ok, Ready}; use std::task::{Context, Poll}; use super::*; use crate::service::{fn_factory, Service, ServiceFactory}; + use crate::util::{lazy, Ready}; #[derive(Clone)] struct Srv1; @@ -332,7 +332,7 @@ mod tests { type Request = (); type Response = usize; type Error = (); - type Future = Ready>; + type Future = Ready; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -343,7 +343,7 @@ mod tests { } fn call(&self, _: ()) -> Self::Future { - ok::<_, ()>(1) + Ready::<_, ()>::ok(1) } } @@ -354,7 +354,7 @@ mod tests { type Request = (); type Response = usize; type Error = (); - type Future = Ready>; + type Future = Ready; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -365,15 +365,15 @@ mod tests { } fn call(&self, _: ()) -> Self::Future { - ok::<_, ()>(2) + Ready::<_, ()>::ok(2) } } #[crate::rt_test] async fn test_variant() { - let factory = variant(fn_factory(|| ok::<_, ()>(Srv1))) - .and(fn_factory(|| ok::<_, ()>(Srv2))) - .and(fn_factory(|| ok::<_, ()>(Srv2))) + let factory = variant(fn_factory(|| async { Ok::<_, ()>(Srv1) })) + .and(fn_factory(|| async { Ok::<_, ()>(Srv2) })) + .and(fn_factory(|| async { Ok::<_, ()>(Srv2) })) .clone(); let service = factory.new_service(&()).await.clone().unwrap(); diff --git a/ntex/src/web/app.rs b/ntex/src/web/app.rs index 6c21b48f..93b69a6e 100644 --- a/ntex/src/web/app.rs +++ b/ntex/src/web/app.rs @@ -1,13 +1,11 @@ use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc}; -use futures::future::Either; - use crate::http::Request; use crate::router::ResourceDef; use crate::service::boxed::{self, BoxServiceFactory}; use crate::service::{apply, apply_fn_factory, pipeline_factory}; use crate::service::{IntoServiceFactory, Service, ServiceFactory, Transform}; -use crate::util::{Extensions, Ready}; +use crate::util::{Either, Extensions, Ready}; use super::app_service::{AppEntry, AppFactory, AppRoutingFactory}; use super::config::{AppConfig, ServiceConfig}; @@ -139,7 +137,7 @@ where Box::pin(async move { match fut.await { Err(e) => { - log::error!("Can not construct data instance: {:?}", e); + log::error!("Cannot construct data instance: {:?}", e); Err(()) } Ok(data) => { @@ -293,7 +291,7 @@ where { // create and configure default resource self.default = Some(Rc::new(boxed::factory(f.into_factory().map_init_err( - |e| log::error!("Can not construct default service: {:?}", e), + |e| log::error!("Cannot construct default service: {:?}", e), )))); self @@ -628,7 +626,6 @@ where #[cfg(test)] mod tests { use bytes::Bytes; - use futures::future::ok; use super::*; use crate::http::header::{self, HeaderValue}; @@ -659,13 +656,13 @@ mod tests { .service(web::resource("/test").to(|| async { HttpResponse::Ok() })) .service( web::resource("/test2") - .default_service(|r: WebRequest| { - ok(r.into_response(HttpResponse::Created())) + .default_service(|r: WebRequest| async move { + Ok(r.into_response(HttpResponse::Created())) }) .route(web::get().to(|| async { HttpResponse::Ok() })), ) - .default_service(|r: WebRequest| { - ok(r.into_response(HttpResponse::MethodNotAllowed())) + .default_service(|r: WebRequest| async move { + Ok(r.into_response(HttpResponse::MethodNotAllowed())) }), ) .await; @@ -688,10 +685,12 @@ mod tests { #[crate::rt_test] async fn test_data_factory() { let srv = init_service( - App::new().data_factory(|| ok::<_, ()>(10usize)).service( - web::resource("/") - .to(|_: web::types::Data| async { HttpResponse::Ok() }), - ), + App::new() + .data_factory(|| async { Ok::<_, ()>(10usize) }) + .service( + web::resource("/") + .to(|_: web::types::Data| async { HttpResponse::Ok() }), + ), ) .await; let req = TestRequest::default().to_request(); @@ -699,10 +698,12 @@ mod tests { assert_eq!(resp.status(), StatusCode::OK); let srv = init_service( - App::new().data_factory(|| ok::<_, ()>(10u32)).service( - web::resource("/") - .to(|_: web::types::Data| async { HttpResponse::Ok() }), - ), + App::new() + .data_factory(|| async { Ok::<_, ()>(10u32) }) + .service( + web::resource("/") + .to(|_: web::types::Data| async { HttpResponse::Ok() }), + ), ) .await; let req = TestRequest::default().to_request(); diff --git a/ntex/src/web/error.rs b/ntex/src/web/error.rs index 46e3e1df..c3e43487 100644 --- a/ntex/src/web/error.rs +++ b/ntex/src/web/error.rs @@ -1,13 +1,9 @@ //! Web error -use std::cell::RefCell; -use std::fmt; -use std::io::Write; -use std::marker::PhantomData; +use std::{cell::RefCell, fmt, io::Write, marker::PhantomData}; use bytes::BytesMut; use derive_more::{Display, From}; -pub use futures::channel::oneshot::Canceled; pub use http::Error as HttpError; pub use serde_json::error::Error as JsonError; pub use url::ParseError as UrlParseError; @@ -16,6 +12,7 @@ use super::{HttpRequest, HttpResponse}; use crate::http::body::Body; use crate::http::helpers::Writer; use crate::http::{error, header, StatusCode}; +use crate::util::Either; pub use super::error_default::{DefaultError, Error}; pub use crate::http::error::BlockingError; @@ -59,7 +56,7 @@ where impl WebResponseError for std::convert::Infallible {} -impl WebResponseError for either::Either +impl WebResponseError for Either where A: WebResponseError, B: WebResponseError, @@ -67,15 +64,15 @@ where { fn status_code(&self) -> StatusCode { match self { - either::Either::Left(ref a) => a.status_code(), - either::Either::Right(ref b) => b.status_code(), + Either::Left(ref a) => a.status_code(), + Either::Right(ref b) => b.status_code(), } } fn error_response(&self, req: &HttpRequest) -> HttpResponse { match self { - either::Either::Left(ref a) => a.error_response(req), - either::Either::Right(ref b) => b.error_response(req), + Either::Left(ref a) => a.error_response(req), + Either::Right(ref b) => b.error_response(req), } } } @@ -104,8 +101,8 @@ pub enum UrlGenerationError { /// A set of errors that can occur during parsing urlencoded payloads #[derive(Debug, Display, From)] pub enum UrlencodedError { - /// Can not decode chunked transfer encoding - #[display(fmt = "Can not decode chunked transfer encoding")] + /// Cannot decode chunked transfer encoding + #[display(fmt = "Cannot decode chunked transfer encoding")] Chunked, /// Payload size is bigger than allowed. (default: 256kB) #[display( @@ -170,7 +167,7 @@ pub enum PayloadError { Payload(error::PayloadError), #[display(fmt = "{}", _0)] ContentType(error::ContentTypeError), - #[display(fmt = "Can not decode body")] + #[display(fmt = "Cannot decode body")] Decoding, } @@ -760,15 +757,15 @@ mod tests { fn test_either_error() { let req = TestRequest::default().to_http_request(); - let err: either::Either = - either::Either::Left(SendRequestError::TunnelNotSupported); + let err: Either = + Either::Left(SendRequestError::TunnelNotSupported); let code = WebResponseError::::status_code(&err); assert_eq!(code, StatusCode::INTERNAL_SERVER_ERROR); let resp = WebResponseError::::error_response(&err, &req); assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); - let err: either::Either = - either::Either::Right(PayloadError::Decoding); + let err: Either = + Either::Right(PayloadError::Decoding); let code = WebResponseError::::status_code(&err); assert_eq!(code, StatusCode::BAD_REQUEST); let resp = WebResponseError::::error_response(&err, &req); diff --git a/ntex/src/web/extract.rs b/ntex/src/web/extract.rs index 85064a2c..ca5b8673 100644 --- a/ntex/src/web/extract.rs +++ b/ntex/src/web/extract.rs @@ -35,10 +35,9 @@ pub trait FromRequest: Sized { /// ```rust /// use ntex::{http, util::Ready}; /// use ntex::web::{self, error, App, HttpRequest, FromRequest, DefaultError}; -/// use serde_derive::Deserialize; /// use rand; /// -/// #[derive(Debug, Deserialize)] +/// #[derive(Debug, serde::Deserialize)] /// struct Thing { /// name: String /// } @@ -106,10 +105,9 @@ where /// ```rust /// use ntex::{http, util::Ready}; /// use ntex::web::{self, error, App, HttpRequest, FromRequest}; -/// use serde_derive::Deserialize; /// use rand; /// -/// #[derive(Debug, Deserialize)] +/// #[derive(Debug, serde::Deserialize)] /// struct Thing { /// name: String /// } @@ -255,14 +253,13 @@ tuple_from_req!(TupleFromRequest10, (0, A), (1, B), (2, C), (3, D), (4, E), (5, #[cfg(test)] mod tests { use bytes::Bytes; - use serde_derive::Deserialize; use crate::http::header; use crate::web::error::UrlencodedError; use crate::web::test::{from_request, TestRequest}; use crate::web::types::{Form, FormConfig}; - #[derive(Deserialize, Debug, PartialEq)] + #[derive(serde::Deserialize, Debug, PartialEq)] struct Info { hello: String, } diff --git a/ntex/src/web/httprequest.rs b/ntex/src/web/httprequest.rs index f078b34d..572c8e5c 100644 --- a/ntex/src/web/httprequest.rs +++ b/ntex/src/web/httprequest.rs @@ -259,9 +259,8 @@ impl Drop for HttpRequest { /// /// ```rust /// use ntex::web::{self, App, HttpRequest}; -/// use serde_derive::Deserialize; /// -/// /// extract `Thing` from request +/// /// extract `HttpRequest` from request /// async fn index(req: HttpRequest) -> String { /// format!("Got thing: {:?}", req) /// } diff --git a/ntex/src/web/middleware/compress.rs b/ntex/src/web/middleware/compress.rs index 9ee824cf..38479abb 100644 --- a/ntex/src/web/middleware/compress.rs +++ b/ntex/src/web/middleware/compress.rs @@ -134,8 +134,8 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - match futures::ready!(this.fut.poll(cx)) { - Ok(resp) => { + match this.fut.poll(cx)? { + Poll::Ready(resp) => { let enc = if let Some(enc) = resp.response().get_encoding() { enc } else { @@ -146,7 +146,7 @@ where resp.map_body(move |head, body| Encoder::response(enc, head, body)) )) } - Err(e) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, } } } diff --git a/ntex/src/web/middleware/defaultheaders.rs b/ntex/src/web/middleware/defaultheaders.rs index 78af0ad5..14038255 100644 --- a/ntex/src/web/middleware/defaultheaders.rs +++ b/ntex/src/web/middleware/defaultheaders.rs @@ -71,9 +71,9 @@ impl DefaultHeaders { .headers .append(key, value); } - Err(_) => panic!("Can not create header value"), + Err(_) => panic!("Cannot create header value"), }, - Err(_) => panic!("Can not create header name"), + Err(_) => panic!("Cannot create header name"), } self } @@ -161,11 +161,10 @@ where #[cfg(test)] mod tests { - use futures::future::lazy; - use super::*; use crate::http::header::CONTENT_TYPE; use crate::service::IntoService; + use crate::util::lazy; use crate::web::request::WebRequest; use crate::web::test::{ok_service, TestRequest}; use crate::web::{DefaultError, Error, HttpResponse}; diff --git a/ntex/src/web/middleware/logger.rs b/ntex/src/web/middleware/logger.rs index 164774d2..03f9ebac 100644 --- a/ntex/src/web/middleware/logger.rs +++ b/ntex/src/web/middleware/logger.rs @@ -1,17 +1,15 @@ //! Request logging middleware use std::fmt::{self, Display}; use std::task::{Context, Poll}; -use std::{convert::TryFrom, env, error::Error, future::Future, pin::Pin, rc::Rc}; +use std::{convert::TryFrom, env, error::Error, future::Future, pin::Pin, rc::Rc, time}; use bytes::Bytes; -use futures::future::Either; use regex::Regex; -use time::OffsetDateTime; use crate::http::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::http::header::HeaderName; use crate::service::{Service, Transform}; -use crate::util::{HashSet, Ready}; +use crate::util::{Either, HashSet, Ready}; use crate::web::dev::{WebRequest, WebResponse}; use crate::web::HttpResponse; @@ -166,7 +164,7 @@ where if self.inner.exclude.contains(req.path()) { Either::Right(self.service.call(req)) } else { - let time = OffsetDateTime::now_utc(); + let time = time::SystemTime::now(); let mut format = self.inner.format.clone(); for unit in &mut format.0 { @@ -187,7 +185,7 @@ pin_project_lite::pin_project! { { #[pin] fut: S::Future, - time: OffsetDateTime, + time: time::SystemTime, format: Option, } } @@ -201,9 +199,10 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - let res = match futures::ready!(this.fut.poll(cx)) { - Ok(res) => res, - Err(e) => return Poll::Ready(Err(e)), + let res = match this.fut.poll(cx) { + Poll::Ready(Ok(res)) => res, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, }; if let Some(ref mut format) = this.format { @@ -230,7 +229,7 @@ struct StreamLog { body: ResponseBody, format: Option, size: usize, - time: OffsetDateTime, + time: time::SystemTime, } impl Drop for StreamLog { @@ -357,20 +356,20 @@ impl FormatText { &self, fmt: &mut fmt::Formatter<'_>, size: usize, - entry_time: OffsetDateTime, + entry_time: time::SystemTime, ) -> Result<(), fmt::Error> { match *self { FormatText::Str(ref string) => fmt.write_str(string), FormatText::Percent => "%".fmt(fmt), FormatText::ResponseSize => size.fmt(fmt), FormatText::Time => { - let rt = OffsetDateTime::now_utc() - entry_time; - let rt = rt.as_seconds_f64(); + let rt = entry_time.elapsed().unwrap(); + let rt = rt.as_secs_f64(); fmt.write_fmt(format_args!("{:.6}", rt)) } FormatText::TimeMillis => { - let rt = OffsetDateTime::now_utc() - entry_time; - let rt = (rt.whole_nanoseconds() as f64) / 1_000_000.0; + let rt = entry_time.elapsed().unwrap(); + let rt = (rt.as_nanos() as f64) / 1_000_000.0; fmt.write_fmt(format_args!("{:.6}", rt)) } FormatText::EnvironHeader(ref name) => { @@ -405,7 +404,7 @@ impl FormatText { } } - fn render_request(&mut self, now: OffsetDateTime, req: &WebRequest) { + fn render_request(&mut self, now: time::SystemTime, req: &WebRequest) { match *self { FormatText::RequestLine => { *self = if req.query_string().is_empty() { @@ -427,7 +426,7 @@ impl FormatText { } FormatText::UrlPath => *self = FormatText::Str(req.path().to_string()), FormatText::RequestTime => { - *self = FormatText::Str(now.format("%Y-%m-%dT%H:%M:%S")) + *self = FormatText::Str(httpdate::HttpDate::from(now).to_string()) } FormatText::RequestHeader(ref name) => { let s = if let Some(val) = req.headers().get(name) { @@ -466,18 +465,17 @@ impl<'a> fmt::Display for FormatDisplay<'a> { #[cfg(test)] mod tests { - use futures::future::{lazy, ok}; - use super::*; use crate::http::{header, StatusCode}; use crate::service::{IntoService, Service, Transform}; + use crate::util::lazy; use crate::web::test::{self, TestRequest}; use crate::web::{DefaultError, Error}; #[crate::rt_test] async fn test_logger() { - let srv = |req: WebRequest| { - ok::<_, Error>( + let srv = |req: WebRequest| async move { + Ok::<_, Error>( req.into_response( HttpResponse::build(StatusCode::OK) .header("X-Test", "ttt") @@ -523,7 +521,7 @@ mod tests { .uri("/test/route/yeah?q=test") .to_srv_request(); - let now = OffsetDateTime::now_utc(); + let now = time::SystemTime::now(); for unit in &mut format.0 { unit.render_request(now, &req); } @@ -553,7 +551,7 @@ mod tests { ) .to_srv_request(); - let now = OffsetDateTime::now_utc(); + let now = time::SystemTime::now(); for unit in &mut format.0 { unit.render_request(now, &req); } @@ -563,7 +561,7 @@ mod tests { unit.render_response(&resp); } - let entry_time = OffsetDateTime::now_utc(); + let entry_time = time::SystemTime::now(); let render = |fmt: &mut fmt::Formatter<'_>| { for unit in &format.0 { unit.render(fmt, 1024, entry_time)?; @@ -581,7 +579,7 @@ mod tests { let mut format = Format::new("%t"); let req = TestRequest::default().to_srv_request(); - let now = OffsetDateTime::now_utc(); + let now = time::SystemTime::now(); for unit in &mut format.0 { unit.render_request(now, &req); } @@ -598,6 +596,6 @@ mod tests { Ok(()) }; let s = format!("{}", FormatDisplay(&render)); - assert!(s.contains(&now.format("%Y-%m-%dT%H:%M:%S"))); + assert!(s.contains(&httpdate::HttpDate::from(now).to_string())); } } diff --git a/ntex/src/web/mod.rs b/ntex/src/web/mod.rs index a9540c5f..3c5e99d4 100644 --- a/ntex/src/web/mod.rs +++ b/ntex/src/web/mod.rs @@ -100,7 +100,6 @@ pub use ntex_macros::web_trace as trace; pub use crate::http::Response as HttpResponse; pub use crate::http::ResponseBuilder as HttpResponseBuilder; -pub use either::Either; pub use self::app::App; pub use self::config::ServiceConfig; diff --git a/ntex/src/web/resource.rs b/ntex/src/web/resource.rs index be1f9ac4..70da10d0 100644 --- a/ntex/src/web/resource.rs +++ b/ntex/src/web/resource.rs @@ -2,14 +2,12 @@ use std::{ cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll, }; -use futures::future::Either; - use crate::http::Response; use crate::router::{IntoPattern, ResourceDef}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; use crate::service::{apply, apply_fn_factory, pipeline_factory}; use crate::service::{IntoServiceFactory, Service, ServiceFactory, Transform}; -use crate::util::{Extensions, Ready}; +use crate::util::{Either, Extensions, Ready}; use super::dev::{insert_slesh, WebServiceConfig, WebServiceFactory}; use super::error::ErrorRenderer; @@ -412,7 +410,7 @@ where // create and configure default resource self.default = Rc::new(RefCell::new(Some(Rc::new(boxed::factory( f.into_factory().map_init_err(|e| { - log::error!("Can not construct default service: {:?}", e) + log::error!("Cannot construct default service: {:?}", e) }), ))))); @@ -580,8 +578,6 @@ impl ServiceFactory for ResourceEndpoint { mod tests { use std::time::Duration; - use futures::future::Either; - use crate::http::header::{self, HeaderValue}; use crate::http::{Method, StatusCode}; use crate::rt::time::sleep; @@ -589,7 +585,7 @@ mod tests { use crate::web::request::WebRequest; use crate::web::test::{call_service, init_service, TestRequest}; use crate::web::{self, guard, App, DefaultError, HttpResponse}; - use crate::{fn_service, Service}; + use crate::{fn_service, util::Either, Service}; #[crate::rt_test] async fn test_filter() { diff --git a/ntex/src/web/responder.rs b/ntex/src/web/responder.rs index ecd905ba..a794484b 100644 --- a/ntex/src/web/responder.rs +++ b/ntex/src/web/responder.rs @@ -1,22 +1,37 @@ -use std::convert::TryFrom; -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; use std::task::{Context, Poll}; +use std::{convert::TryFrom, future::Future, marker::PhantomData, pin::Pin}; use bytes::{Bytes, BytesMut}; -use futures::future::{ready, Either as EitherFuture, Ready}; -use futures::ready; use crate::http::error::HttpError; use crate::http::header::{HeaderMap, HeaderName, HeaderValue}; use crate::http::{Response, ResponseBuilder, StatusCode}; +use crate::util::Either; use super::error::{ DefaultError, ErrorContainer, ErrorRenderer, InternalError, WebResponseError, }; use super::httprequest::HttpRequest; +pub struct Ready(Option); + +impl Unpin for Ready {} + +impl From for Ready { + fn from(t: T) -> Self { + Ready(Some(t)) + } +} + +impl Future for Ready { + type Output = T; + + #[inline] + fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + Poll::Ready(self.0.take().expect("Ready polled after completion")) + } +} + /// Trait implemented by types that can be converted to a http response. /// /// Types that implement this trait can be used as the return type of a handler. @@ -85,7 +100,7 @@ impl Responder for Response { #[inline] fn respond_to(self, _: &HttpRequest) -> Self::Future { - ready(self) + Ready(Some(self)) } } @@ -95,7 +110,7 @@ impl Responder for ResponseBuilder { #[inline] fn respond_to(mut self, _: &HttpRequest) -> Self::Future { - ready(self.finish()) + Ready(Some(self.finish())) } } @@ -105,14 +120,14 @@ where Err: ErrorRenderer, { type Error = T::Error; - type Future = EitherFuture>; + type Future = Either>; fn respond_to(self, req: &HttpRequest) -> Self::Future { match self { - Some(t) => EitherFuture::Left(t.respond_to(req)), - None => EitherFuture::Right(ready( + Some(t) => Either::Left(t.respond_to(req)), + None => Either::Right(Ready(Some( Response::build(StatusCode::NOT_FOUND).finish(), - )), + ))), } } } @@ -124,12 +139,12 @@ where Err: ErrorRenderer, { type Error = T::Error; - type Future = EitherFuture>; + type Future = Either>; fn respond_to(self, req: &HttpRequest) -> Self::Future { match self { - Ok(val) => EitherFuture::Left(val.respond_to(req)), - Err(e) => EitherFuture::Right(ready(e.into().error_response(req))), + Ok(val) => Either::Left(val.respond_to(req)), + Err(e) => Either::Right(Ready(Some(e.into().error_response(req)))), } } } @@ -156,11 +171,11 @@ impl Responder for &'static str { type Future = Ready; fn respond_to(self, _: &HttpRequest) -> Self::Future { - ready( + Ready(Some( Response::build(StatusCode::OK) .content_type("text/plain; charset=utf-8") .body(self), - ) + )) } } @@ -169,11 +184,11 @@ impl Responder for &'static [u8] { type Future = Ready; fn respond_to(self, _: &HttpRequest) -> Self::Future { - ready( + Ready(Some( Response::build(StatusCode::OK) .content_type("application/octet-stream") .body(self), - ) + )) } } @@ -182,11 +197,11 @@ impl Responder for String { type Future = Ready; fn respond_to(self, _: &HttpRequest) -> Self::Future { - ready( + Ready(Some( Response::build(StatusCode::OK) .content_type("text/plain; charset=utf-8") .body(self), - ) + )) } } @@ -195,11 +210,11 @@ impl<'a, Err: ErrorRenderer> Responder for &'a String { type Future = Ready; fn respond_to(self, _: &HttpRequest) -> Self::Future { - ready( + Ready(Some( Response::build(StatusCode::OK) .content_type("text/plain; charset=utf-8") .body(self), - ) + )) } } @@ -208,11 +223,11 @@ impl Responder for Bytes { type Future = Ready; fn respond_to(self, _: &HttpRequest) -> Self::Future { - ready( + Ready(Some( Response::build(StatusCode::OK) .content_type("application/octet-stream") .body(self), - ) + )) } } @@ -221,11 +236,11 @@ impl Responder for BytesMut { type Future = Ready; fn respond_to(self, _: &HttpRequest) -> Self::Future { - ready( + Ready(Some( Response::build(StatusCode::OK) .content_type("application/octet-stream") .body(self), - ) + )) } } @@ -336,7 +351,12 @@ impl, Err: ErrorRenderer> Future for CustomResponderFut, cx: &mut Context<'_>) -> Poll { let this = self.project(); - let mut res = ready!(this.fut.poll(cx)); + let mut res = if let Poll::Ready(res) = this.fut.poll(cx) { + res + } else { + return Poll::Pending; + }; + if let Some(status) = this.status.take() { *res.status_mut() = status; } @@ -352,7 +372,7 @@ impl, Err: ErrorRenderer> Future for CustomResponderFut Either { /// if is_a_variant() { @@ -366,19 +386,19 @@ impl, Err: ErrorRenderer> Future for CustomResponderFut bool { true } /// # fn main() {} /// ``` -impl Responder for either::Either +impl Responder for Either where A: Responder, B: Responder, Err: ErrorRenderer, { type Error = Err::Container; - type Future = EitherFuture; + type Future = Either; fn respond_to(self, req: &HttpRequest) -> Self::Future { match self { - either::Either::Left(a) => EitherFuture::Left(a.respond_to(req)), - either::Either::Right(b) => EitherFuture::Right(b.respond_to(req)), + Either::Left(a) => Either::Left(a.respond_to(req)), + Either::Right(b) => Either::Right(b.respond_to(req)), } } } @@ -392,7 +412,7 @@ where type Future = Ready; fn respond_to(self, req: &HttpRequest) -> Self::Future { - ready(self.error_response(req)) + Ready(Some(self.error_response(req))) } } @@ -419,9 +439,9 @@ pub(crate) mod tests { let srv = init_service(web::App::new().service( web::resource("/index.html").to(|req: HttpRequest| async move { if req.query_string().is_empty() { - either::Either::Left(HttpResponse::BadRequest()) + Either::Left(HttpResponse::BadRequest()) } else { - either::Either::Right("hello") + Either::Right("hello") } }), )) diff --git a/ntex/src/web/route.rs b/ntex/src/web/route.rs index 7fe39998..b9004475 100644 --- a/ntex/src/web/route.rs +++ b/ntex/src/web/route.rs @@ -146,9 +146,8 @@ impl Route { /// /// ```rust /// use ntex::web; - /// use serde_derive::Deserialize; /// - /// #[derive(Deserialize)] + /// #[derive(serde::Deserialize)] /// struct Info { /// username: String, /// } @@ -170,10 +169,9 @@ impl Route { /// /// ```rust /// # use std::collections::HashMap; - /// # use serde_derive::Deserialize; /// use ntex::web; /// - /// #[derive(Deserialize)] + /// #[derive(serde::Deserialize)] /// struct Info { /// username: String, /// } @@ -279,14 +277,13 @@ mod tests { use std::time::Duration; use bytes::Bytes; - use serde_derive::Serialize; use crate::http::{Method, StatusCode}; use crate::rt::time::sleep; use crate::web::test::{call_service, init_service, read_body, TestRequest}; use crate::web::{self, error, App, DefaultError, HttpResponse}; - #[derive(Serialize, PartialEq, Debug)] + #[derive(serde::Serialize, PartialEq, Debug)] struct MyObject { name: String, } diff --git a/ntex/src/web/scope.rs b/ntex/src/web/scope.rs index 17abf9ba..6eded0a0 100644 --- a/ntex/src/web/scope.rs +++ b/ntex/src/web/scope.rs @@ -2,14 +2,12 @@ use std::{ cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll, }; -use futures::future::Either; - use crate::http::Response; use crate::router::{IntoPattern, ResourceDef, ResourceInfo, Router}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; use crate::service::{apply, apply_fn_factory, pipeline_factory}; use crate::service::{IntoServiceFactory, Service, ServiceFactory, Transform}; -use crate::util::{Extensions, Ready}; +use crate::util::{Either, Extensions, Ready}; use super::config::ServiceConfig; use super::dev::{WebServiceConfig, WebServiceFactory}; @@ -306,7 +304,7 @@ where // create and configure default resource self.default = Rc::new(RefCell::new(Some(Rc::new(boxed::factory( f.into_factory().map_init_err(|e| { - log::error!("Can not construct default service: {:?}", e) + log::error!("Cannot construct default service: {:?}", e) }), ))))); @@ -677,13 +675,11 @@ impl ServiceFactory for ScopeEndpoint { #[cfg(test)] mod tests { - use bytes::Bytes; - use futures::future::Either; - use crate::http::body::{Body, ResponseBody}; use crate::http::header::{HeaderValue, CONTENT_TYPE}; use crate::http::{Method, StatusCode}; use crate::service::{fn_service, Service}; + use crate::util::{Bytes, Either}; use crate::web::middleware::DefaultHeaders; use crate::web::request::WebRequest; use crate::web::test::{call_service, init_service, read_body, TestRequest}; diff --git a/ntex/src/web/server.rs b/ntex/src/web/server.rs index 4af4efb1..965e1e8d 100644 --- a/ntex/src/web/server.rs +++ b/ntex/src/web/server.rs @@ -1,13 +1,9 @@ -use std::marker::PhantomData; -use std::sync::{Arc, Mutex}; -use std::{fmt, io, net}; +use std::{fmt, io, marker::PhantomData, net, sync::Arc, sync::Mutex}; #[cfg(feature = "openssl")] use crate::server::openssl::{AlpnError, SslAcceptor, SslAcceptorBuilder}; #[cfg(feature = "rustls")] use crate::server::rustls::ServerConfig as RustlsServerConfig; -#[cfg(unix)] -use futures::future::ok; #[cfg(unix)] use crate::http::Protocol; @@ -433,7 +429,7 @@ where } else { Err(io::Error::new( io::ErrorKind::Other, - "Can not bind to address.", + "Cannot bind to address.", )) } } else { @@ -505,7 +501,10 @@ where socket_addr, c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)), ); - pipeline_factory(|io: UnixStream| ok((io, Protocol::Http1, None))).and_then( + pipeline_factory(|io: UnixStream| { + crate::util::Ready::ok((io, Protocol::Http1, None)) + }) + .and_then( HttpService::build() .keep_alive(c.keep_alive) .client_timeout(c.client_timeout) @@ -543,14 +542,16 @@ where socket_addr, c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)), ); - pipeline_factory(|io: UnixStream| ok((io, Protocol::Http1, None))) - .and_then( - HttpService::build() - .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .buffer_params(c.read_hw, c.write_hw, c.lw) - .finish(map_config(factory(), move |_| config.clone())), - ) + pipeline_factory(|io: UnixStream| { + crate::util::Ready::ok((io, Protocol::Http1, None)) + }) + .and_then( + HttpService::build() + .keep_alive(c.keep_alive) + .client_timeout(c.client_timeout) + .buffer_params(c.read_hw, c.write_hw, c.lw) + .finish(map_config(factory(), move |_| config.clone())), + ) }, )?; Ok(self) diff --git a/ntex/src/web/service.rs b/ntex/src/web/service.rs index 500f93ce..676ad607 100644 --- a/ntex/src/web/service.rs +++ b/ntex/src/web/service.rs @@ -365,8 +365,6 @@ tuple_web_service!((0,A),(1,B),(2,C),(3,D),(4,E),(5,F),(6,G),(7,H),(8,I),(9,J),( #[cfg(test)] mod tests { - use futures::future::ok; - use super::*; use crate::http::{Method, StatusCode}; use crate::service::Service; @@ -398,8 +396,8 @@ mod tests { async fn test_service() { let srv = init_service(App::new().service( web::service("/test").name("test").finish( - |req: WebRequest| { - ok(req.into_response(HttpResponse::Ok().finish())) + |req: WebRequest| async move { + Ok(req.into_response(HttpResponse::Ok().finish())) }, ), )) @@ -410,8 +408,8 @@ mod tests { let srv = init_service(App::new().service( web::service("/test").guard(guard::Get()).finish( - |req: WebRequest| { - ok(req.into_response(HttpResponse::Ok().finish())) + |req: WebRequest| async move { + Ok(req.into_response(HttpResponse::Ok().finish())) }, ), )) diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index bf201748..cd9c6c65 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -5,8 +5,7 @@ use std::{ }; use bytes::{Bytes, BytesMut}; -use futures::future::ok; -use futures::stream::{Stream, StreamExt}; +use futures_core::Stream; use serde::de::DeserializeOwned; use serde::Serialize; @@ -24,7 +23,7 @@ use crate::http::{HttpService, Method, Payload, Request, StatusCode, Uri, Versio use crate::router::{Path, ResourceDef}; use crate::rt::{time::sleep, System}; use crate::server::Server; -use crate::util::Extensions; +use crate::util::{next, Extensions, Ready}; use crate::{map_config, IntoService, IntoServiceFactory, Service, ServiceFactory}; use crate::web::config::AppConfig; @@ -52,7 +51,7 @@ pub fn default_service( Error = std::convert::Infallible, > { (move |req: WebRequest| { - ok(req.into_response(HttpResponse::build(status_code).finish())) + Ready::ok(req.into_response(HttpResponse::build(status_code).finish())) }) .into_service() } @@ -165,7 +164,7 @@ where let mut body = resp.take_body(); let mut bytes = BytesMut::new(); - while let Some(item) = body.next().await { + while let Some(item) = next(&mut body).await { bytes.extend_from_slice(&item.unwrap()); } bytes.freeze() @@ -201,7 +200,7 @@ where pub async fn read_body(mut res: WebResponse) -> Bytes { let mut body = res.take_body(); let mut bytes = BytesMut::new(); - while let Some(item) = body.next().await { + while let Some(item) = next(&mut body).await { bytes.extend_from_slice(&item.unwrap()); } bytes.freeze() @@ -213,7 +212,7 @@ where S: Stream>> + Unpin, { let mut data = BytesMut::new(); - while let Some(item) = stream.next().await { + while let Some(item) = next(&mut stream).await { data.extend_from_slice(&item?); } Ok(data.freeze()) @@ -735,7 +734,7 @@ where builder.set_verify(SslVerifyMode::NONE); let _ = builder .set_alpn_protos(b"\x02h2\x08http/1.1") - .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e)); + .map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e)); Connector::default() .lifetime(time::Duration::from_secs(0)) .keep_alive(time::Duration::from_millis(30000)) diff --git a/ntex/src/web/types/form.rs b/ntex/src/web/types/form.rs index 0a4bd1cc..e4ad9dff 100644 --- a/ntex/src/web/types/form.rs +++ b/ntex/src/web/types/form.rs @@ -4,8 +4,6 @@ use std::{fmt, future::Future, ops, pin::Pin, task::Context, task::Poll}; use bytes::BytesMut; use encoding_rs::{Encoding, UTF_8}; -use futures::future::{ready, Ready}; -use futures::StreamExt; use serde::de::DeserializeOwned; use serde::Serialize; @@ -13,8 +11,10 @@ use serde::Serialize; use crate::http::encoding::Decoder; use crate::http::header::{CONTENT_LENGTH, CONTENT_TYPE}; use crate::http::{HttpMessage, Payload, Response, StatusCode}; +use crate::util::next; use crate::web::error::{ErrorRenderer, UrlencodedError, WebResponseError}; -use crate::web::{FromRequest, HttpRequest, Responder}; +use crate::web::responder::{Ready, Responder}; +use crate::web::{FromRequest, HttpRequest}; /// Form data helper (`application/x-www-form-urlencoded`) /// @@ -32,9 +32,8 @@ use crate::web::{FromRequest, HttpRequest, Responder}; /// ### Example /// ```rust /// use ntex::web; -/// use serde_derive::Deserialize; /// -/// #[derive(Deserialize)] +/// #[derive(serde::Deserialize)] /// struct FormData { /// username: String, /// } @@ -57,9 +56,8 @@ use crate::web::{FromRequest, HttpRequest, Responder}; /// ### Example /// ```rust /// use ntex::web; -/// use serde_derive::Serialize; /// -/// #[derive(Serialize)] +/// #[derive(serde::Serialize)] /// struct SomeForm { /// name: String, /// age: u8 @@ -147,14 +145,13 @@ where fn respond_to(self, req: &HttpRequest) -> Self::Future { let body = match serde_urlencoded::to_string(&self.0) { Ok(body) => body, - Err(e) => return ready(e.error_response(req)), + Err(e) => return e.error_response(req).into(), }; - ready( - Response::build(StatusCode::OK) - .header(CONTENT_TYPE, "application/x-www-form-urlencoded") - .body(body), - ) + Response::build(StatusCode::OK) + .header(CONTENT_TYPE, "application/x-www-form-urlencoded") + .body(body) + .into() } } @@ -162,9 +159,8 @@ where /// /// ```rust /// use ntex::web::{self, App, Error, FromRequest}; -/// use serde_derive::Deserialize; /// -/// #[derive(Deserialize)] +/// #[derive(serde::Deserialize)] /// struct FormData { /// username: String, /// } @@ -316,7 +312,7 @@ where self.fut = Some(Box::pin(async move { let mut body = BytesMut::with_capacity(8192); - while let Some(item) = stream.next().await { + while let Some(item) = next(&mut stream).await { let chunk = item?; if (body.len() + chunk.len()) > limit { return Err(UrlencodedError::Overflow { diff --git a/ntex/src/web/types/json.rs b/ntex/src/web/types/json.rs index 82d1cd2b..5ec32eb6 100644 --- a/ntex/src/web/types/json.rs +++ b/ntex/src/web/types/json.rs @@ -3,8 +3,6 @@ use std::{fmt, future::Future, ops, pin::Pin, sync::Arc, task::Context, task::Poll}; use bytes::BytesMut; -use futures::future::{ready, Ready}; -use futures::StreamExt; use serde::de::DeserializeOwned; use serde::Serialize; @@ -12,8 +10,10 @@ use serde::Serialize; use crate::http::encoding::Decoder; use crate::http::header::CONTENT_LENGTH; use crate::http::{HttpMessage, Payload, Response, StatusCode}; +use crate::util::next; use crate::web::error::{ErrorRenderer, JsonError, JsonPayloadError, WebResponseError}; -use crate::web::{FromRequest, HttpRequest, Responder}; +use crate::web::responder::{Ready, Responder}; +use crate::web::{FromRequest, HttpRequest}; /// Json helper /// @@ -31,9 +31,8 @@ use crate::web::{FromRequest, HttpRequest, Responder}; /// /// ```rust /// use ntex::web; -/// use serde_derive::Deserialize; /// -/// #[derive(Deserialize)] +/// #[derive(serde::Deserialize)] /// struct Info { /// username: String, /// } @@ -58,9 +57,8 @@ use crate::web::{FromRequest, HttpRequest, Responder}; /// /// ```rust /// use ntex::web; -/// use serde_derive::Serialize; /// -/// #[derive(Serialize)] +/// #[derive(serde::Serialize)] /// struct MyObj { /// name: String, /// } @@ -123,14 +121,13 @@ where fn respond_to(self, req: &HttpRequest) -> Self::Future { let body = match serde_json::to_string(&self.0) { Ok(body) => body, - Err(e) => return ready(e.error_response(req)), + Err(e) => return e.error_response(req).into(), }; - ready( - Response::build(StatusCode::OK) - .content_type("application/json") - .body(body), - ) + Response::build(StatusCode::OK) + .content_type("application/json") + .body(body) + .into() } } @@ -147,9 +144,8 @@ where /// /// ```rust /// use ntex::web; -/// use serde_derive::Deserialize; /// -/// #[derive(Deserialize)] +/// #[derive(serde::Deserialize)] /// struct Info { /// username: String, /// } @@ -203,9 +199,8 @@ where /// ```rust /// use ntex::http::error; /// use ntex::web::{self, App, FromRequest, HttpResponse}; -/// use serde_derive::Deserialize; /// -/// #[derive(Deserialize)] +/// #[derive(serde::Deserialize)] /// struct Info { /// username: String, /// } @@ -362,7 +357,7 @@ where self.fut = Some(Box::pin(async move { let mut body = BytesMut::with_capacity(8192); - while let Some(item) = stream.next().await { + while let Some(item) = next(&mut stream).await { let chunk = item?; if (body.len() + chunk.len()) > limit { return Err(JsonPayloadError::Overflow); @@ -380,13 +375,14 @@ where #[cfg(test)] mod tests { use bytes::Bytes; - use serde_derive::{Deserialize, Serialize}; use super::*; use crate::http::header; use crate::web::test::{from_request, respond_to, TestRequest}; - #[derive(Serialize, Deserialize, PartialEq, Debug, derive_more::Display)] + #[derive( + serde::Serialize, serde::Deserialize, PartialEq, Debug, derive_more::Display, + )] struct MyObject { name: String, } diff --git a/ntex/src/web/types/path.rs b/ntex/src/web/types/path.rs index 8f31f056..b1ef4fd7 100644 --- a/ntex/src/web/types/path.rs +++ b/ntex/src/web/types/path.rs @@ -1,13 +1,11 @@ //! Path extractor use std::{fmt, ops}; -use futures::future::{ready, Ready}; use serde::de; -use crate::http::Payload; -use crate::router::PathDeserializer; use crate::web::error::{ErrorRenderer, PathError}; use crate::web::{FromRequest, HttpRequest}; +use crate::{http::Payload, router::PathDeserializer, util::Ready}; #[derive(PartialEq, Eq, PartialOrd, Ord)] /// Extract typed information from the request's path. @@ -39,9 +37,8 @@ use crate::web::{FromRequest, HttpRequest}; /// /// ```rust /// use ntex::web; -/// use serde_derive::Deserialize; /// -/// #[derive(Deserialize)] +/// #[derive(serde::Deserialize)] /// struct Info { /// username: String, /// } @@ -134,9 +131,8 @@ impl fmt::Display for Path { /// /// ```rust /// use ntex::web; -/// use serde_derive::Deserialize; /// -/// #[derive(Deserialize)] +/// #[derive(serde::Deserialize)] /// struct Info { /// username: String, /// } @@ -158,11 +154,11 @@ where T: de::DeserializeOwned, { type Error = PathError; - type Future = Ready>; + type Future = Ready; #[inline] fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { - ready( + Ready::result( de::Deserialize::deserialize(PathDeserializer::new(req.match_info())) .map(|inner| Path { inner }) .map_err(move |e| { @@ -180,20 +176,19 @@ where #[cfg(test)] mod tests { use derive_more::Display; - use serde_derive::Deserialize; use super::*; use crate::router::Router; use crate::web::test::{from_request, TestRequest}; - #[derive(Deserialize, Debug, Display)] + #[derive(serde::Deserialize, Debug, Display)] #[display(fmt = "MyStruct({}, {})", key, value)] struct MyStruct { key: String, value: String, } - #[derive(Deserialize)] + #[derive(serde::Deserialize)] struct Test2 { key: String, value: u32, diff --git a/ntex/src/web/types/payload.rs b/ntex/src/web/types/payload.rs index 2622b7d3..60ed6d9f 100644 --- a/ntex/src/web/types/payload.rs +++ b/ntex/src/web/types/payload.rs @@ -3,12 +3,11 @@ use std::{future::Future, pin::Pin, str, task::Context, task::Poll}; use bytes::{Bytes, BytesMut}; use encoding_rs::UTF_8; -use futures::future::Either; -use futures::{Stream, StreamExt}; +use futures_core::Stream; use mime::Mime; use crate::http::{error, header, HttpMessage}; -use crate::util::Ready; +use crate::util::{next, Either, Ready}; use crate::web::error::{ErrorRenderer, PayloadError}; use crate::web::{FromRequest, HttpRequest}; @@ -18,14 +17,14 @@ use crate::web::{FromRequest, HttpRequest}; /// /// ```rust /// use bytes::BytesMut; -/// use futures::{Future, Stream, StreamExt}; +/// use futures::{Future, Stream}; /// use ntex::web::{self, error, App, HttpResponse}; /// /// /// extract binary data from request /// async fn index(mut body: web::types::Payload) -> Result /// { /// let mut bytes = BytesMut::new(); -/// while let Some(item) = body.next().await { +/// while let Some(item) = ntex::util::next(&mut body).await { /// bytes.extend_from_slice(&item?); /// } /// @@ -68,14 +67,14 @@ impl Stream for Payload { /// /// ```rust /// use bytes::BytesMut; -/// use futures::{Future, Stream, StreamExt}; +/// use futures::{Future, Stream}; /// use ntex::web::{self, error, App, Error, HttpResponse}; /// /// /// extract binary data from request /// async fn index(mut body: web::types::Payload) -> Result /// { /// let mut bytes = BytesMut::new(); -/// while let Some(item) = body.next().await { +/// while let Some(item) = ntex::util::next(&mut body).await { /// bytes.extend_from_slice(&item?); /// } /// @@ -393,7 +392,7 @@ impl Future for HttpMessageBody { self.fut = Some(Box::pin(async move { let mut body = BytesMut::with_capacity(8192); - while let Some(item) = stream.next().await { + while let Some(item) = next(&mut stream).await { let chunk = item?; if body.len() + chunk.len() > limit { return Err(PayloadError::from(error::PayloadError::Overflow)); @@ -410,7 +409,6 @@ impl Future for HttpMessageBody { #[cfg(test)] mod tests { use bytes::Bytes; - use futures::StreamExt; use super::*; use crate::http::header; @@ -444,7 +442,7 @@ mod tests { .await .unwrap() .into_inner(); - let b = s.next().await.unwrap().unwrap(); + let b = next(&mut s).await.unwrap().unwrap(); assert_eq!(b, Bytes::from_static(b"hello=world")); } diff --git a/ntex/src/web/types/query.rs b/ntex/src/web/types/query.rs index a00b859c..524684ea 100644 --- a/ntex/src/web/types/query.rs +++ b/ntex/src/web/types/query.rs @@ -20,15 +20,14 @@ use crate::{http::Payload, util::Ready}; /// /// ```rust /// use ntex::web; -/// use serde_derive::Deserialize; /// -/// #[derive(Debug, Deserialize)] +/// #[derive(Debug, serde::Deserialize)] /// pub enum ResponseType { /// Token, /// Code /// } /// -/// #[derive(Deserialize)] +/// #[derive(serde::Deserialize)] /// pub struct AuthRequest { /// id: u64, /// response_type: ResponseType, @@ -98,15 +97,14 @@ impl fmt::Display for Query { /// /// ```rust /// use ntex::web; -/// use serde_derive::Deserialize; /// -/// #[derive(Debug, Deserialize)] +/// #[derive(Debug, serde::Deserialize)] /// pub enum ResponseType { /// Token, /// Code /// } /// -/// #[derive(Deserialize)] +/// #[derive(serde::Deserialize)] /// pub struct AuthRequest { /// id: u64, /// response_type: ResponseType, @@ -153,12 +151,11 @@ where #[cfg(test)] mod tests { use derive_more::Display; - use serde_derive::Deserialize; use super::*; use crate::web::test::{from_request, TestRequest}; - #[derive(Deserialize, Debug, Display)] + #[derive(serde::Deserialize, Debug, Display)] struct Id { id: String, } diff --git a/ntex/src/web/ws.rs b/ntex/src/web/ws.rs index 250f3c78..76576073 100644 --- a/ntex/src/web/ws.rs +++ b/ntex/src/web/ws.rs @@ -1,7 +1,10 @@ -use std::error::Error as StdError; +use std::{ + error::Error as StdError, marker::PhantomData, pin::Pin, task::Context, task::Poll, +}; use bytes::Bytes; -use futures::{Sink, Stream, TryStreamExt}; +use futures_core::Stream; +use futures_sink::Sink; pub use crate::ws::{CloseCode, CloseReason, Frame, Message}; @@ -84,10 +87,10 @@ where // start websockets service dispatcher rt::spawn(crate::util::stream::Dispatcher::new( // wrap bytes stream to ws::Frame's stream - ws::StreamDecoder::new(payload).map_err(|e| { - let e: Box = Box::new(e); - e - }), + MapStream { + stream: ws::StreamDecoder::new(payload), + _t: PhantomData, + }, // converter wraper from ws::Message to Bytes sink, // websockets handler service @@ -96,3 +99,35 @@ where Ok(res.body(Body::from_message(BoxedBodyStream::new(rx)))) } + +pin_project_lite::pin_project! { + struct MapStream{ + #[pin] + stream: S, + _t: PhantomData<(I, E)>, + } +} + +impl Stream for MapStream +where + S: Stream>, + E: StdError + 'static, +{ + type Item = Result>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.project().stream.poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(Ok(item))), + Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(Box::new(err)))), + Poll::Ready(None) => Poll::Ready(None), + } + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} diff --git a/ntex/src/ws/frame.rs b/ntex/src/ws/frame.rs index c0281f25..23832fb6 100644 --- a/ntex/src/ws/frame.rs +++ b/ntex/src/ws/frame.rs @@ -2,6 +2,7 @@ use std::convert::TryFrom; use bytes::{Buf, BufMut, BytesMut}; use log::debug; +use nanorand::{WyRand, RNG}; use super::mask::apply_mask; use super::proto::{CloseCode, CloseReason, OpCode}; @@ -187,7 +188,7 @@ impl Parser { }; if mask { - let mask = rand::random::(); + let mask: u32 = WyRand::new().generate(); dst.put_u32_le(mask); dst.extend_from_slice(payload.as_ref()); let pos = dst.len() - payload_len; diff --git a/ntex/src/ws/stream.rs b/ntex/src/ws/stream.rs index 5935c09c..d562c4b3 100644 --- a/ntex/src/ws/stream.rs +++ b/ntex/src/ws/stream.rs @@ -3,10 +3,10 @@ use std::{ }; use bytes::{Bytes, BytesMut}; -use futures::{Sink, Stream}; use ntex_codec::{Decoder, Encoder}; use super::{Codec, Frame, Message, ProtocolError}; +use crate::{Sink, Stream}; /// Stream error #[derive(Debug, Display)] @@ -171,10 +171,9 @@ where #[cfg(test)] mod tests { use bytestring::ByteString; - use futures::{SinkExt, StreamExt}; use super::*; - use crate::channel::mpsc; + use crate::{channel::mpsc, util::next, util::poll_fn, util::send}; #[crate::rt_test] async fn test_decoder() { @@ -191,12 +190,12 @@ mod tests { .unwrap(); tx.send(Ok::<_, ()>(buf.split().freeze())).unwrap(); - let frame = StreamExt::next(&mut decoder).await.unwrap().unwrap(); + let frame = next(&mut decoder).await.unwrap().unwrap(); match frame { Frame::Text(data) => assert_eq!(data, b"test1"[..]), _ => panic!(), } - let frame = StreamExt::next(&mut decoder).await.unwrap().unwrap(); + let frame = next(&mut decoder).await.unwrap().unwrap(); match frame { Frame::Text(data) => assert_eq!(data, b"test2"[..]), _ => panic!(), @@ -208,15 +207,21 @@ mod tests { let (tx, mut rx) = mpsc::channel(); let mut encoder = StreamEncoder::new(tx); - encoder - .send(Ok::<_, ()>(Message::Text(ByteString::from_static("test")))) + send( + &mut encoder, + Ok::<_, ()>(Message::Text(ByteString::from_static("test"))), + ) + .await + .unwrap(); + poll_fn(|cx| Pin::new(&mut encoder).poll_flush(cx)) + .await + .unwrap(); + poll_fn(|cx| Pin::new(&mut encoder).poll_close(cx)) .await .unwrap(); - encoder.flush().await.unwrap(); - encoder.close().await.unwrap(); - let data = rx.next().await.unwrap().unwrap(); + let data = next(&mut rx).await.unwrap().unwrap(); assert_eq!(data, b"\x81\x04test".as_ref()); - assert!(rx.next().await.is_none()); + assert!(next(&mut rx).await.is_none()); } } diff --git a/ntex/tests/http_awc_openssl_client.rs b/ntex/tests/http_awc_openssl_client.rs index 7ec49a29..2f70415b 100644 --- a/ntex/tests/http_awc_openssl_client.rs +++ b/ntex/tests/http_awc_openssl_client.rs @@ -63,7 +63,7 @@ async fn test_connection_reuse_h2() { builder.set_verify(SslVerifyMode::NONE); let _ = builder .set_alpn_protos(b"\x02h2\x08http/1.1") - .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e)); + .map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e)); let client = Client::build() .connector(Connector::default().openssl(builder.build()).finish()) diff --git a/ntex/tests/http_openssl.rs b/ntex/tests/http_openssl.rs index 402f601f..dde2761e 100644 --- a/ntex/tests/http_openssl.rs +++ b/ntex/tests/http_openssl.rs @@ -50,7 +50,7 @@ fn ssl_acceptor() -> SslAcceptor { }); builder .set_alpn_protos(b"\x08http/1.1\x02h2") - .expect("Can not contrust SslAcceptor"); + .expect("Cannot contrust SslAcceptor"); builder.build() } diff --git a/ntex/tests/web_httpserver.rs b/ntex/tests/web_httpserver.rs index 91b53843..18c0cb57 100644 --- a/ntex/tests/web_httpserver.rs +++ b/ntex/tests/web_httpserver.rs @@ -87,7 +87,7 @@ fn client() -> ntex::http::client::Client { builder.set_verify(SslVerifyMode::NONE); let _ = builder .set_alpn_protos(b"\x02h2\x08http/1.1") - .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e)); + .map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e)); ntex::http::client::Client::build() .timeout(Duration::from_millis(30000))