This commit is contained in:
Nikolay Kim 2024-05-28 15:20:22 +05:00
parent b0f9a09fe9
commit b25223265c
24 changed files with 129 additions and 182 deletions

View file

@ -37,3 +37,5 @@ ntex-util = { path = "ntex-util" }
ntex-glommio = { path = "ntex-glommio" }
ntex-tokio = { path = "ntex-tokio" }
ntex-async-std = { path = "ntex-async-std" }
ntex-h2 = { path = "../dev/ntex-h2" }

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-async-std"
version = "0.4.0"
version = "0.5.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "async-std intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -16,9 +16,9 @@ name = "ntex_async_std"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1.21"
ntex-io = "1.0.0"
ntex-util = "1.0.0"
ntex-bytes = "0.1"
ntex-io = "2.0"
ntex-util = "2.0"
log = "0.4"
async-std = { version = "1", features = ["unstable"] }
oneshot = { version = "0.1", default-features = false, features = ["async"] }

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-glommio"
version = "0.4.0"
version = "0.5.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "glommio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -16,9 +16,9 @@ name = "ntex_glommio"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1.24"
ntex-io = "1.0.0"
ntex-util = "1.0.0"
ntex-bytes = "0.1"
ntex-io = "2.0"
ntex-util = "2.0"
futures-lite = "2.2"
log = "0.4"
oneshot = { version = "0.1", default-features = false, features = ["async"] }

View file

@ -2,7 +2,6 @@
#![allow(clippy::let_underscore_future)]
use std::{cell::Cell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
use ntex_bytes::Pool;
use ntex_codec::{Decoder, Encoder};
use ntex_service::{Pipeline, PipelineBinding, PipelineCall, Service};
use ntex_util::{future::Either, ready, spawn, time::Seconds};
@ -144,7 +143,6 @@ where
flags: Flags,
shared: Rc<DispatcherShared<S, U>>,
response: Option<PipelineCall<S::Response, S::Error>>,
pool: Pool,
cfg: DispatcherConfig,
read_remains: u32,
read_remains_prev: u32,
@ -211,7 +209,6 @@ where
Flags::KA_ENABLED
};
let pool = io.memory_pool().pool();
let shared = Rc::new(DispatcherShared {
io,
codec,
@ -222,7 +219,6 @@ where
Dispatcher {
inner: DispatcherInner {
pool,
shared,
flags,
cfg: cfg.clone(),
@ -278,14 +274,6 @@ where
}
}
// handle memory pool pressure
if slf.pool.poll_ready(cx).is_pending() {
slf.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT);
slf.shared.io.stop_timer();
slf.shared.io.pause();
return Poll::Pending;
}
loop {
match slf.st {
DispatcherState::Processing => {

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-net"
version = "1.0.2"
version = "2.0.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntexwork utils for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -28,16 +28,16 @@ glommio = ["ntex-rt/glommio", "ntex-glommio"]
async-std = ["ntex-rt/async-std", "ntex-async-std"]
[dependencies]
ntex-service = "2.0"
ntex-bytes = "0.1.24"
ntex-service = "3.0"
ntex-bytes = "0.1"
ntex-http = "0.1"
ntex-io = "1.0"
ntex-io = "2.0"
ntex-rt = "0.4.11"
ntex-util = "1.0"
ntex-util = "2.0"
ntex-tokio = { version = "0.4.0", optional = true }
ntex-glommio = { version = "0.4.0", optional = true }
ntex-async-std = { version = "0.4.0", optional = true }
ntex-tokio = { version = "0.5.0", optional = true }
ntex-glommio = { version = "0.5.0", optional = true }
ntex-async-std = { version = "0.5.0", optional = true }
log = "0.4"
thiserror = "1.0"

View file

@ -6,7 +6,6 @@ use ntex_util::future::Either;
use super::{Address, Connect, ConnectError};
#[derive(Copy)]
/// DNS Resolver Service
pub struct Resolver<T>(marker::PhantomData<T>);
@ -17,6 +16,8 @@ impl<T> Resolver<T> {
}
}
impl<T> Copy for Resolver<T> {}
impl<T: Address> Resolver<T> {
/// Lookup ip addresses for provided host
pub async fn lookup(&self, req: Connect<T>) -> Result<Connect<T>, ConnectError> {

View file

@ -9,13 +9,15 @@ use ntex_util::future::{BoxFuture, Either};
use super::{Address, Connect, ConnectError, Resolver};
use crate::tcp_connect_in;
#[derive(Copy)]
/// Basic tcp stream connector
pub struct Connector<T> {
resolver: Resolver<T>,
pool: PoolRef,
tag: &'static str,
}
impl<T> Copy for Connector<T> {}
impl<T> Connector<T> {
/// Construct new connect service with default dns resolver
pub fn new() -> Self {

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-server"
version = "1.0.5"
version = "2.0.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Server for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -16,11 +16,11 @@ name = "ntex_server"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1.24"
ntex-net = "1.0"
ntex-service = "2.0"
ntex-rt = "0.4.13"
ntex-util = "1.0"
ntex-bytes = "0.1"
ntex-net = "2.0"
ntex-service = "3.0"
ntex-rt = "0.4"
ntex-util = "2.0"
async-channel = "2"
async-broadcast = "0.7"

View file

@ -1,4 +1,4 @@
use std::{cell::Cell, rc::Rc, task};
use std::{cell::Cell, rc::Rc, task, task::Poll, future::poll_fn};
use ntex_util::task::LocalWaker;
@ -30,8 +30,12 @@ impl Counter {
/// Check if counter is not at capacity. If counter at capacity
/// it registers notification for current task.
pub(super) fn available(&self, cx: &mut task::Context<'_>) -> bool {
self.0.available(cx)
pub(super) async fn available(&self) {
poll_fn(|cx| if self.0.available(cx) {
Poll::Ready(())
} else {
Poll::Pending
}).await
}
/// Get total number of acquired counts

View file

@ -1,4 +1,3 @@
use std::task::{Context, Poll};
use std::{fmt, future::Future, marker::PhantomData};
use ntex_bytes::PoolId;
@ -144,10 +143,10 @@ where
type Response = ();
type Error = ();
ntex_service::forward_poll_shutdown!(inner);
ntex_service::forward_shutdown!(inner);
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
self.inner.poll_ready(cx).map_err(|_| ())
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
ctx.ready(&self.inner).await.map_err(|_| ())
}
async fn call(&self, req: Io, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {

View file

@ -1,5 +1,3 @@
use std::{task::Context, task::Poll};
use ntex_bytes::{Pool, PoolRef};
use ntex_net::Io;
use ntex_service::{boxed, Service, ServiceCtx, ServiceFactory};
@ -142,53 +140,34 @@ impl Service<ServerMessage> for StreamServiceImpl {
type Response = ();
type Error = ();
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut ready = self.conns.available(cx);
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.conns.available().await;
for (idx, svc) in self.services.iter().enumerate() {
match svc.poll_ready(cx) {
Poll::Pending => ready = false,
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(_)) => {
match ctx.ready(svc).await {
Ok(()) => (),
Err(_) => {
for (idx_, tag, _, _) in self.tokens.values() {
if idx == *idx_ {
log::error!("{}: Service readiness has failed", tag);
break;
}
}
return Poll::Ready(Err(()));
return Err(());
}
}
}
// check memory pools
for (_, _, pool, _) in self.tokens.values() {
ready = pool.poll_ready(cx).is_ready() && ready;
}
if ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
Ok(())
}
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
let mut ready = true;
async fn shutdown(&self) {
for svc in &self.services {
match svc.poll_shutdown(cx) {
Poll::Pending => ready = false,
Poll::Ready(_) => (),
}
}
if ready {
log::info!(
"Worker service shutdown, {} connections",
super::num_connections()
);
Poll::Ready(())
} else {
Poll::Pending
svc.shutdown().await;
}
log::info!(
"Worker service shutdown, {} connections",
super::num_connections()
);
}
async fn call(&self, req: ServerMessage, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {

View file

@ -6,7 +6,7 @@ use async_broadcast::{self as bus, broadcast};
use async_channel::{unbounded, Receiver, Sender};
use ntex_rt::{spawn, Arbiter};
use ntex_service::{Pipeline, ServiceFactory};
use ntex_service::{Pipeline, ServiceFactory, PipelineBinding};
use ntex_util::future::{select, stream_recv, Either, Stream};
use ntex_util::time::{sleep, timeout_checked, Millis};
@ -240,7 +240,7 @@ struct WorkerSt<T, F: ServiceFactory<WorkerMessage<T>>> {
availability: WorkerAvailabilityTx,
}
async fn run_worker<T, F>(mut svc: Pipeline<F::Service>, mut wrk: WorkerSt<T, F>)
async fn run_worker<T, F>(mut svc: PipelineBinding<F::Service, WorkerMessage<T>>, mut wrk: WorkerSt<T, F>)
where
T: Send + 'static,
F: ServiceFactory<WorkerMessage<T>> + 'static,
@ -250,7 +250,7 @@ where
ready!(svc.poll_ready(cx)?);
if let Some(item) = ready!(Pin::new(&mut wrk.rx).poll_next(cx)) {
let fut = svc.call_static(WorkerMessage::New(item));
let fut = svc.call(WorkerMessage::New(item));
let _ = spawn(async move {
let _ = fut.await;
});
@ -267,17 +267,17 @@ where
wrk.availability.set(false);
if timeout.is_zero() {
let fut = svc.call_static(WorkerMessage::ForceShutdown);
let fut = svc.call(WorkerMessage::ForceShutdown);
let _ = spawn(async move {
let _ = fut.await;
});
sleep(STOP_TIMEOUT).await;
} else {
let fut = svc.call_static(WorkerMessage::Shutdown(timeout));
let fut = svc.call(WorkerMessage::Shutdown(timeout));
let res = timeout_checked(timeout, fut).await;
let _ = result.send(res.is_ok());
};
poll_fn(|cx| svc.poll_shutdown(cx)).await;
svc.shutdown().await;
log::info!("Stopping worker {:?}", wrk.id);
return;
@ -289,7 +289,7 @@ where
match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await {
Either::Left(Ok(service)) => {
wrk.availability.set(true);
svc = Pipeline::new(service);
svc = Pipeline::new(service).bind();
break;
}
Either::Left(Err(_)) => sleep(STOP_TIMEOUT).await,
@ -305,7 +305,7 @@ async fn create<T, F>(
stop: Receiver<Shutdown>,
factory: Result<F, ()>,
availability: WorkerAvailabilityTx,
) -> Result<(Pipeline<F::Service>, WorkerSt<T, F>), ()>
) -> Result<(PipelineBinding<F::Service, WorkerMessage<T>>, WorkerSt<T, F>), ()>
where
T: Send + 'static,
F: ServiceFactory<WorkerMessage<T>> + 'static,
@ -317,7 +317,7 @@ where
let mut stop = Box::pin(stop);
let svc = match select(factory.create(()), stream_recv(&mut stop)).await {
Either::Left(Ok(svc)) => Pipeline::new(svc),
Either::Left(Ok(svc)) => Pipeline::new(svc).bind(),
Either::Left(Err(_)) => return Err(()),
Either::Right(Some(Shutdown { result, .. })) => {
log::trace!("Shutdown uninitialized worker");

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-tls"
version = "1.1.0"
version = "2.0.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "An implementation of SSL streams for ntex backed by OpenSSL"
keywords = ["network", "framework", "async", "futures"]
@ -25,11 +25,11 @@ openssl = ["tls_openssl"]
rustls = ["tls_rust"]
[dependencies]
ntex-bytes = "0.1.21"
ntex-io = "1.0"
ntex-util = "1.0"
ntex-service = "2.0"
ntex-net = "1.0"
ntex-bytes = "0.1"
ntex-io = "2.0"
ntex-util = "2.0"
ntex-service = "3.0"
ntex-net = "2.0"
log = "0.4"

View file

@ -1,5 +1,4 @@
#![allow(dead_code)]
use std::{cell::Cell, rc::Rc, task};
use std::{cell::Cell, rc::Rc, task, task::Poll, future::poll_fn};
use ntex_util::task::LocalWaker;
@ -33,8 +32,12 @@ impl Counter {
/// Check if counter is not at capacity. If counter at capacity
/// it registers notification for current task.
pub(super) fn available(&self, cx: &mut task::Context<'_>) -> bool {
self.0.available(cx)
pub(super) async fn available(&self) {
poll_fn(|cx| if self.0.available(cx) {
Poll::Ready(())
} else {
Poll::Pending
}).await
}
}

View file

@ -1,4 +1,4 @@
use std::{cell::RefCell, error::Error, fmt, io, task::Context, task::Poll};
use std::{cell::RefCell, error::Error, fmt, io};
use ntex_io::{Filter, Io, Layer};
use ntex_service::{Service, ServiceCtx, ServiceFactory};
@ -97,12 +97,9 @@ impl<F: Filter> Service<Io<F>> for SslAcceptorService {
type Response = Io<Layer<SslFilter, F>>;
type Error = Box<dyn Error>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(cx) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.conns.available().await;
Ok(())
}
async fn call(

View file

@ -29,8 +29,7 @@ impl<T: Address> SslConnector<T> {
pub fn memory_pool(self, id: PoolId) -> Self {
let connector = self
.connector
.into_service()
.expect("Connector has been cloned")
.get_ref()
.memory_pool(id)
.into();

View file

@ -1,4 +1,3 @@
use std::task::{Context, Poll};
use std::{io, sync::Arc};
use tls_rust::ServerConfig;
@ -81,12 +80,9 @@ impl<F: Filter> Service<Io<F>> for TlsAcceptorService {
type Response = Io<Layer<TlsServerFilter, F>>;
type Error = io::Error;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(cx) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.conns.available().await;
Ok(())
}
async fn call(

View file

@ -38,8 +38,7 @@ impl<T: Address> TlsConnector<T> {
pub fn memory_pool(self, id: PoolId) -> Self {
let connector = self
.connector
.into_service()
.unwrap()
.get_ref()
.memory_pool(id)
.into();
Self {

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-tokio"
version = "0.4.0"
version = "0.5.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "tokio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -16,8 +16,8 @@ name = "ntex_tokio"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1.21"
ntex-io = "1.0.0"
ntex-util = "1.0.0"
ntex-bytes = "0.1"
ntex-io = "2.0"
ntex-util = "2.0"
log = "0.4"
tokio = { version = "1", default-features = false, features = ["rt", "net", "sync", "signal"] }

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "1.2.1"
version = "2.0.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -60,16 +60,16 @@ brotli = ["dep:brotli2"]
ntex-codec = "0.6.2"
ntex-http = "0.1.12"
ntex-router = "0.5.3"
ntex-service = "2.0.1"
ntex-service = "3.0.0"
ntex-macros = "0.1.3"
ntex-util = "1.0.1"
ntex-util = "2.0.0"
ntex-bytes = "0.1.25"
ntex-server = "1.0.5"
ntex-h2 = "0.5.4"
ntex-server = "2.0.0"
ntex-h2 = "1.0.0"
ntex-rt = "0.4.12"
ntex-io = "1.2.0"
ntex-net = "1.0.1"
ntex-tls = "1.1.0"
ntex-io = "2.0.0"
ntex-net = "2.0.0"
ntex-tls = "2.0.0"
base64 = "0.22"
bitflags = "2"

View file

@ -6,6 +6,7 @@ use crate::http::error::{DispatchError, ResponseError};
use crate::http::{request::Request, response::Response};
use crate::io::{types, Filter, Io};
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
use crate::util::join;
use super::control::{Control, ControlAck};
use super::default::DefaultControlService;
@ -208,43 +209,24 @@ where
type Response = ();
type Error = DispatchError;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
async fn ready(&self) -> Result<(), Self::Error> {
let cfg = self.config.as_ref();
let ready1 = cfg
.control
.poll_ready(cx)
.map_err(|e| {
log::error!("Http control service readiness error: {:?}", e);
DispatchError::Control(Box::new(e))
})?
.is_ready();
let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await;
let ready2 = cfg
.service
.poll_ready(cx)
.map_err(|e| {
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(Box::new(e))
})?
.is_ready();
if ready1 && ready2 {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
ready1.map_err(|e| {
log::error!("Http control service readiness error: {:?}", e);
DispatchError::Control(Box::new(e))
})?;
ready2.map_err(|e| {
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(Box::new(e))
})
}
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
let ready1 = self.config.control.poll_shutdown(cx).is_ready();
let ready2 = self.config.service.poll_shutdown(cx).is_ready();
if ready1 && ready2 {
Poll::Ready(())
} else {
Poll::Pending
}
async fn shutdown(&self) {
self.config.control.shutdown().await;
self.config.service.shutdown().await;
}
async fn call(&self, io: Io<F>, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {

View file

@ -209,15 +209,17 @@ where
type Response = ();
type Error = DispatchError;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.config.service.poll_ready(cx).map_err(|e| {
#[inline]
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.config.service.ready().await.map_err(|e| {
log::error!("Service readiness error: {:?}", e);
DispatchError::Service(Box::new(e))
})
}
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
self.config.service.poll_shutdown(cx)
#[inline]
async fn shutdown(&self) {
self.config.service.shutdown().await
}
async fn call(

View file

@ -5,7 +5,7 @@ use crate::router::{Path, ResourceDef, Router};
use crate::service::boxed::{self, BoxService, BoxServiceFactory};
use crate::service::dev::ServiceChainFactory;
use crate::service::{fn_service, Middleware, Service, ServiceCtx, ServiceFactory};
use crate::util::{BoxFuture, Extensions};
use crate::util::{join, BoxFuture, Extensions};
use super::config::AppConfig;
use super::error::ErrorRenderer;
@ -202,8 +202,8 @@ where
type Response = WebResponse;
type Error = T::Error;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
crate::forward_ready!(service);
crate::forward_shutdown!(service);
async fn call(
&self,
@ -294,14 +294,11 @@ where
type Error = Err::Container;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready1 = self.filter.poll_ready(cx)?.is_ready();
let ready2 = self.routing.poll_ready(cx)?.is_ready();
if ready1 && ready2 {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
let (ready1, ready2) =
join(ctx.ready(&self.filter), ctx.ready(&self.routing)).await;
ready1?;
ready2
}
async fn call(

View file

@ -5,7 +5,7 @@ use crate::router::{IntoPattern, ResourceDef, Router};
use crate::service::boxed::{self, BoxService, BoxServiceFactory};
use crate::service::{chain_factory, dev::ServiceChainFactory, IntoServiceFactory};
use crate::service::{Identity, Middleware, Service, ServiceCtx, ServiceFactory, Stack};
use crate::util::Extensions;
use crate::util::{join, Extensions};
use super::app::Filter;
use super::config::ServiceConfig;
@ -486,14 +486,11 @@ where
type Error = Err::Container;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready1 = self.filter.poll_ready(cx)?.is_ready();
let ready2 = self.routing.poll_ready(cx)?.is_ready();
if ready1 && ready2 {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
let (ready1, ready2) =
join(ctx.ready(&self.filter), ctx.ready(&self.routing)).await;
ready1?;
ready2
}
async fn call(