Replace actix-threadpool with tokio utils

This commit is contained in:
Nikolay Kim 2021-01-25 17:52:27 +06:00
parent 26543a4247
commit b4ccf776dd
10 changed files with 70 additions and 43 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.2] - 2021-01-25
* Replace actix-threadpool with tokio's task utils
## [0.1.1] - 2020-04-15
* Api cleanup

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-rt"
version = "0.1.1"
version = "0.1.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"]
@ -17,6 +17,5 @@ path = "src/lib.rs"
[dependencies]
ntex-rt-macros = "0.1.0"
actix-threadpool = "0.3"
futures = "0.3.4"
tokio = { version = "0.2.6", default-features=false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }
futures = "0.3.12"
tokio = { version = "0.2.6", default-features=false, features = ["rt-core", "rt-util", "io-driver", "blocking", "tcp", "uds", "udp", "time", "signal", "stream"] }

View file

@ -14,9 +14,6 @@ pub use self::system::System;
#[cfg(not(test))] // Work around for rust-lang/rust#62127
pub use ntex_rt_macros::{rt_main as main, rt_test as test};
#[doc(hidden)]
pub use actix_threadpool as blocking;
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
@ -25,7 +22,7 @@ pub use actix_threadpool as blocking;
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(f: F) -> tokio::task::JoinHandle<F::Output>
pub fn spawn<F>(f: F) -> self::task::JoinHandle<F::Output>
where
F: futures::Future + 'static,
{
@ -78,3 +75,8 @@ pub mod time {
pub use tokio::time::{interval, interval_at, Interval};
pub use tokio::time::{timeout, Timeout};
}
/// Task management.
pub mod task {
pub use tokio::task::{spawn_blocking, yield_now, JoinError, JoinHandle};
}

View file

@ -6,6 +6,8 @@
* Refactor framed disaptcher write back-pressure support
* Replace actix-threadpool with tokio utils
## [0.2.0-b.6] - 2021-01-24
* http: Pass io stream to upgrade handler

View file

@ -37,13 +37,12 @@ cookie = ["coo-kie", "coo-kie/percent-encode"]
[dependencies]
ntex-codec = "0.3.0-b.1"
ntex-rt = "0.1.1"
ntex-rt = "0.1.2"
ntex-rt-macros = "0.1"
ntex-router = "0.3.8"
ntex-service = "0.1.5"
ntex-macros = "0.1"
actix-threadpool = "0.3.3"
base64 = "0.13"
bitflags = "1.2.1"
bytes = "0.5.6"
@ -73,7 +72,7 @@ socket2 = "0.3.12"
url = "2.1"
time = { version = "0.2.11", default-features = false, features = ["std"] }
coo-kie = { version = "0.14.2", package = "cookie", optional = true }
tokio = "0.2.6"
tokio = "=0.2.6"
# resolver
trust-dns-proto = { version = "0.19.6", default-features = false }
@ -106,4 +105,4 @@ rust-tls = { version = "0.19.0", package="rustls", features = ["dangerous_config
webpki = "0.21.2"
[patch.crates-io]
ntex = { path = "../ntex-codec" }
ntex-rt = { path = "../ntex-rt" }

View file

@ -3,7 +3,6 @@ use std::io::{self, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_threadpool::{run, CpuFuture};
use brotli2::write::BrotliDecoder;
use bytes::Bytes;
use flate2::write::{GzDecoder, ZlibDecoder};
@ -12,6 +11,7 @@ use futures::{ready, Stream};
use super::Writer;
use crate::http::error::PayloadError;
use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING};
use crate::rt::task::{spawn_blocking, JoinHandle};
const INPLACE: usize = 2049;
@ -19,7 +19,7 @@ pub struct Decoder<S> {
decoder: Option<ContentDecoder>,
stream: S,
eof: bool,
fut: Option<CpuFuture<(Option<Bytes>, ContentDecoder), io::Error>>,
fut: Option<JoinHandle<Result<(Option<Bytes>, ContentDecoder), io::Error>>>,
}
impl<S> Decoder<S>
@ -80,7 +80,8 @@ where
loop {
if let Some(ref mut fut) = self.fut {
let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) {
Ok(item) => item,
Ok(Ok(item)) => item,
Ok(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Err(e) => return Poll::Ready(Some(Err(e.into()))),
};
self.decoder = Some(decoder);
@ -105,7 +106,7 @@ where
return Poll::Ready(Some(Ok(chunk)));
}
} else {
self.fut = Some(run(move || {
self.fut = Some(spawn_blocking(move || {
let chunk = decoder.feed_data(chunk)?;
Ok((chunk, decoder))
}));

View file

@ -5,7 +5,6 @@ use std::io::{self, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_threadpool::{run, BlockingError, CpuFuture};
use brotli2::write::BrotliEncoder;
use bytes::Bytes;
use flate2::write::{GzEncoder, ZlibEncoder};
@ -14,6 +13,7 @@ use futures::ready;
use crate::http::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::http::header::{ContentEncoding, HeaderValue, CONTENT_ENCODING};
use crate::http::{ResponseHead, StatusCode};
use crate::rt::task::{spawn_blocking, JoinHandle};
use super::Writer;
@ -23,7 +23,7 @@ pub struct Encoder<B> {
eof: bool,
body: EncoderBody<B>,
encoder: Option<ContentEncoder>,
fut: Option<CpuFuture<ContentEncoder, io::Error>>,
fut: Option<JoinHandle<Result<ContentEncoder, io::Error>>>,
}
impl<B: MessageBody + 'static> Encoder<B> {
@ -96,15 +96,13 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
if let Some(ref mut fut) = self.fut {
let mut encoder = match ready!(Pin::new(fut).poll(cx)) {
Ok(item) => item,
Err(e) => {
let e = match e {
BlockingError::Error(e) => e,
BlockingError::Canceled => {
io::Error::new(io::ErrorKind::Other, "Canceled")
}
};
return Poll::Ready(Some(Err(Box::new(e))));
Ok(Ok(item)) => item,
Ok(Err(e)) => return Poll::Ready(Some(Err(Box::new(e)))),
Err(_) => {
return Poll::Ready(Some(Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Canceled",
)))));
}
};
let chunk = encoder.take();
@ -137,7 +135,7 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
return Poll::Ready(Some(Ok(chunk)));
}
} else {
self.fut = Some(run(move || {
self.fut = Some(spawn_blocking(move || {
encoder.write(&chunk)?;
Ok(encoder)
}));

View file

@ -5,12 +5,12 @@ use either::Either;
use http::{header, uri::InvalidUri, StatusCode};
// re-export for convinience
pub use actix_threadpool::BlockingError;
pub use futures::channel::oneshot::Canceled;
pub use http::Error as HttpError;
use crate::http::body::Body;
use crate::http::response::Response;
use crate::rt::task::JoinError;
/// Error that can be converted to `Response`
pub trait ResponseError: fmt::Display + fmt::Debug {
@ -148,18 +148,6 @@ pub enum PayloadError {
impl std::error::Error for PayloadError {}
impl From<BlockingError<io::Error>> for PayloadError {
fn from(err: BlockingError<io::Error>) -> Self {
match err {
BlockingError::Error(e) => PayloadError::Io(e),
BlockingError::Canceled => PayloadError::Io(io::Error::new(
io::ErrorKind::Other,
"Operation is canceled",
)),
}
}
}
impl From<Either<PayloadError, io::Error>> for PayloadError {
fn from(err: Either<PayloadError, io::Error>) -> Self {
match err {
@ -245,6 +233,38 @@ pub enum ContentTypeError {
Expected,
}
/// Blocking operation execution error
#[derive(Debug, Display)]
pub enum BlockingError<E: fmt::Debug> {
#[display(fmt = "{:?}", _0)]
Error(E),
#[display(fmt = "Thread pool is gone")]
Canceled,
}
impl<E: fmt::Debug> std::error::Error for BlockingError<E> {}
impl From<JoinError> for PayloadError {
fn from(_: JoinError) -> Self {
PayloadError::Io(io::Error::new(
io::ErrorKind::Other,
"Operation is canceled",
))
}
}
impl From<BlockingError<io::Error>> for PayloadError {
fn from(err: BlockingError<io::Error>) -> Self {
match err {
BlockingError::Error(e) => PayloadError::Io(e),
BlockingError::Canceled => PayloadError::Io(io::Error::new(
io::ErrorKind::Other,
"Operation is canceled",
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -7,7 +7,6 @@ use std::marker::PhantomData;
use bytes::BytesMut;
use derive_more::{Display, From};
pub use actix_threadpool::BlockingError;
pub use futures::channel::oneshot::Canceled;
pub use http::Error as HttpError;
pub use serde_json::error::Error as JsonError;

View file

@ -261,7 +261,10 @@ where
I: Send + 'static,
E: Send + std::fmt::Debug + 'static,
{
actix_threadpool::run(f).await
match ntex_rt::task::spawn_blocking(f).await {
Ok(res) => res.map_err(BlockingError::Error),
Err(_) => Err(BlockingError::Canceled),
}
}
/// Create new http server with application factory.