mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-05 05:47:40 +03:00
Http gracefull shutdown support (#393)
This commit is contained in:
parent
f574916e15
commit
5f20ee2be5
13 changed files with 377 additions and 45 deletions
|
@ -1,5 +1,9 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [2.2.0] - 2024-08-12
|
||||||
|
|
||||||
|
* Allow to notify dispatcher from IoRef
|
||||||
|
|
||||||
## [2.1.0] - 2024-07-30
|
## [2.1.0] - 2024-07-30
|
||||||
|
|
||||||
* Optimize `Io` layout
|
* Optimize `Io` layout
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex-io"
|
name = "ntex-io"
|
||||||
version = "2.1.0"
|
version = "2.2.0"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "Utilities for encoding and decoding frames"
|
description = "Utilities for encoding and decoding frames"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
|
|
@ -217,17 +217,24 @@ impl IoRef {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
/// current timer handle
|
/// Wakeup dispatcher
|
||||||
pub fn timer_handle(&self) -> timer::TimerHandle {
|
pub fn notify_dispatcher(&self) {
|
||||||
self.0.timeout.get()
|
self.0.dispatch_task.wake();
|
||||||
|
log::trace!("{}: Timer, notify dispatcher", self.tag());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
/// wakeup dispatcher and send keep-alive error
|
/// Wakeup dispatcher and send keep-alive error
|
||||||
pub fn notify_timeout(&self) {
|
pub fn notify_timeout(&self) {
|
||||||
self.0.notify_timeout()
|
self.0.notify_timeout()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
/// current timer handle
|
||||||
|
pub fn timer_handle(&self) -> timer::TimerHandle {
|
||||||
|
self.0.timeout.get()
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
/// Start timer
|
/// Start timer
|
||||||
pub fn start_timer(&self, timeout: Seconds) -> timer::TimerHandle {
|
pub fn start_timer(&self, timeout: Seconds) -> timer::TimerHandle {
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [2.2.0] - 2024-08-12
|
||||||
|
|
||||||
|
* Http server gracefull shutdown support
|
||||||
|
|
||||||
## [2.1.0] - 2024-07-30
|
## [2.1.0] - 2024-07-30
|
||||||
|
|
||||||
* Better handling for connection upgrade #385
|
* Better handling for connection upgrade #385
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex"
|
name = "ntex"
|
||||||
version = "2.1.0"
|
version = "2.2.0"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "Framework for composable network services"
|
description = "Framework for composable network services"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
@ -65,10 +65,10 @@ ntex-service = "3.0"
|
||||||
ntex-macros = "0.1.3"
|
ntex-macros = "0.1.3"
|
||||||
ntex-util = "2"
|
ntex-util = "2"
|
||||||
ntex-bytes = "0.1.27"
|
ntex-bytes = "0.1.27"
|
||||||
ntex-server = "2.1"
|
ntex-server = "2.3"
|
||||||
ntex-h2 = "1.0"
|
ntex-h2 = "1.1"
|
||||||
ntex-rt = "0.4.13"
|
ntex-rt = "0.4.13"
|
||||||
ntex-io = "2.1"
|
ntex-io = "2.2"
|
||||||
ntex-net = "2.0"
|
ntex-net = "2.0"
|
||||||
ntex-tls = "2.0"
|
ntex-tls = "2.0"
|
||||||
|
|
||||||
|
|
|
@ -234,13 +234,23 @@ impl ServiceConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bitflags::bitflags! {
|
||||||
|
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||||
|
struct Flags: u8 {
|
||||||
|
/// Keep-alive enabled
|
||||||
|
const KA_ENABLED = 0b0000_0001;
|
||||||
|
/// Shutdown service
|
||||||
|
const SHUTDOWN = 0b0000_0010;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) struct DispatcherConfig<S, C> {
|
pub(super) struct DispatcherConfig<S, C> {
|
||||||
|
flags: Cell<Flags>,
|
||||||
pub(super) service: Pipeline<S>,
|
pub(super) service: Pipeline<S>,
|
||||||
pub(super) control: Pipeline<C>,
|
pub(super) control: Pipeline<C>,
|
||||||
pub(super) keep_alive: Seconds,
|
pub(super) keep_alive: Seconds,
|
||||||
pub(super) client_disconnect: Seconds,
|
pub(super) client_disconnect: Seconds,
|
||||||
pub(super) h2config: h2::Config,
|
pub(super) h2config: h2::Config,
|
||||||
pub(super) ka_enabled: bool,
|
|
||||||
pub(super) headers_read_rate: Option<ReadRate>,
|
pub(super) headers_read_rate: Option<ReadRate>,
|
||||||
pub(super) payload_read_rate: Option<ReadRate>,
|
pub(super) payload_read_rate: Option<ReadRate>,
|
||||||
pub(super) timer: DateService,
|
pub(super) timer: DateService,
|
||||||
|
@ -253,22 +263,39 @@ impl<S, C> DispatcherConfig<S, C> {
|
||||||
control: control.into(),
|
control: control.into(),
|
||||||
keep_alive: cfg.keep_alive,
|
keep_alive: cfg.keep_alive,
|
||||||
client_disconnect: cfg.client_disconnect,
|
client_disconnect: cfg.client_disconnect,
|
||||||
ka_enabled: cfg.ka_enabled,
|
|
||||||
headers_read_rate: cfg.headers_read_rate,
|
headers_read_rate: cfg.headers_read_rate,
|
||||||
payload_read_rate: cfg.payload_read_rate,
|
payload_read_rate: cfg.payload_read_rate,
|
||||||
h2config: cfg.h2config.clone(),
|
h2config: cfg.h2config.clone(),
|
||||||
timer: cfg.timer.clone(),
|
timer: cfg.timer.clone(),
|
||||||
|
flags: Cell::new(if cfg.ka_enabled {
|
||||||
|
Flags::KA_ENABLED
|
||||||
|
} else {
|
||||||
|
Flags::empty()
|
||||||
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return state of connection keep-alive functionality
|
/// Return state of connection keep-alive functionality
|
||||||
pub(super) fn keep_alive_enabled(&self) -> bool {
|
pub(super) fn keep_alive_enabled(&self) -> bool {
|
||||||
self.ka_enabled
|
self.flags.get().contains(Flags::KA_ENABLED)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn headers_read_rate(&self) -> Option<&ReadRate> {
|
pub(super) fn headers_read_rate(&self) -> Option<&ReadRate> {
|
||||||
self.headers_read_rate.as_ref()
|
self.headers_read_rate.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Service is shuting down
|
||||||
|
pub(super) fn is_shutdown(&self) -> bool {
|
||||||
|
self.flags.get().contains(Flags::SHUTDOWN)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn shutdown(&self) {
|
||||||
|
self.h2config.shutdown();
|
||||||
|
|
||||||
|
let mut flags = self.flags.get();
|
||||||
|
flags.insert(Flags::SHUTDOWN);
|
||||||
|
self.flags.set(flags);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const DATE_VALUE_LENGTH_HDR: usize = 39;
|
const DATE_VALUE_LENGTH_HDR: usize = 39;
|
||||||
|
|
|
@ -223,6 +223,12 @@ where
|
||||||
B: MessageBody,
|
B: MessageBody,
|
||||||
{
|
{
|
||||||
fn poll_read_request(&mut self, cx: &mut Context<'_>) -> Poll<State<F, C, S, B>> {
|
fn poll_read_request(&mut self, cx: &mut Context<'_>) -> Poll<State<F, C, S, B>> {
|
||||||
|
// stop dispatcher
|
||||||
|
if self.config.is_shutdown() {
|
||||||
|
log::trace!("{}: Service is shutting down", self.io.tag());
|
||||||
|
return Poll::Ready(self.stop());
|
||||||
|
}
|
||||||
|
|
||||||
log::trace!("{}: Trying to read http message", self.io.tag());
|
log::trace!("{}: Trying to read http message", self.io.tag());
|
||||||
|
|
||||||
let result = match self.io.poll_recv_decode(&self.codec, cx) {
|
let result = match self.io.poll_recv_decode(&self.codec, cx) {
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
use std::{error::Error, fmt, marker, rc::Rc};
|
use std::{cell::Cell, cell::RefCell, error::Error, fmt, marker, rc::Rc};
|
||||||
|
|
||||||
use crate::http::body::MessageBody;
|
use crate::http::body::MessageBody;
|
||||||
use crate::http::config::{DispatcherConfig, ServiceConfig};
|
use crate::http::config::{DispatcherConfig, ServiceConfig};
|
||||||
use crate::http::error::{DispatchError, ResponseError};
|
use crate::http::error::{DispatchError, ResponseError};
|
||||||
use crate::http::{request::Request, response::Response};
|
use crate::http::{request::Request, response::Response};
|
||||||
use crate::io::{types, Filter, Io};
|
use crate::io::{types, Filter, Io, IoRef};
|
||||||
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
|
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
|
||||||
use crate::util::join;
|
use crate::{channel::oneshot, util::join, util::HashSet};
|
||||||
|
|
||||||
use super::control::{Control, ControlAck};
|
use super::control::{Control, ControlAck};
|
||||||
use super::default::DefaultControlService;
|
use super::default::DefaultControlService;
|
||||||
|
@ -181,10 +181,14 @@ where
|
||||||
.await
|
.await
|
||||||
.map_err(|e| log::error!("Cannot construct control service: {:?}", e))?;
|
.map_err(|e| log::error!("Cannot construct control service: {:?}", e))?;
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
let config = Rc::new(DispatcherConfig::new(self.cfg.clone(), service, control));
|
let config = Rc::new(DispatcherConfig::new(self.cfg.clone(), service, control));
|
||||||
|
|
||||||
Ok(H1ServiceHandler {
|
Ok(H1ServiceHandler {
|
||||||
config,
|
config,
|
||||||
|
inflight: RefCell::new(Default::default()),
|
||||||
|
rx: Cell::new(Some(rx)),
|
||||||
|
tx: Cell::new(Some(tx)),
|
||||||
_t: marker::PhantomData,
|
_t: marker::PhantomData,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -193,6 +197,9 @@ where
|
||||||
/// `Service` implementation for HTTP1 transport
|
/// `Service` implementation for HTTP1 transport
|
||||||
pub struct H1ServiceHandler<F, S, B, C> {
|
pub struct H1ServiceHandler<F, S, B, C> {
|
||||||
config: Rc<DispatcherConfig<S, C>>,
|
config: Rc<DispatcherConfig<S, C>>,
|
||||||
|
inflight: RefCell<HashSet<IoRef>>,
|
||||||
|
rx: Cell<Option<oneshot::Receiver<()>>>,
|
||||||
|
tx: Cell<Option<oneshot::Sender<()>>>,
|
||||||
_t: marker::PhantomData<(F, B)>,
|
_t: marker::PhantomData<(F, B)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,18 +231,62 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn shutdown(&self) {
|
async fn shutdown(&self) {
|
||||||
self.config.control.shutdown().await;
|
self.config.shutdown();
|
||||||
self.config.service.shutdown().await;
|
|
||||||
|
// check inflight connections
|
||||||
|
let inflight = {
|
||||||
|
let inflight = self.inflight.borrow();
|
||||||
|
for io in inflight.iter() {
|
||||||
|
io.notify_dispatcher();
|
||||||
|
}
|
||||||
|
inflight.len()
|
||||||
|
};
|
||||||
|
if inflight != 0 {
|
||||||
|
log::trace!("Shutting down service, in-flight connections: {}", inflight);
|
||||||
|
|
||||||
|
if let Some(rx) = self.rx.take() {
|
||||||
|
let _ = rx.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
log::trace!("Shutting down is complected",);
|
||||||
|
}
|
||||||
|
|
||||||
|
join(
|
||||||
|
self.config.control.shutdown(),
|
||||||
|
self.config.service.shutdown(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn call(&self, io: Io<F>, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
async fn call(&self, io: Io<F>, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||||
log::trace!(
|
let inflight = {
|
||||||
"New http1 connection, peer address {:?}",
|
let mut inflight = self.inflight.borrow_mut();
|
||||||
io.query::<types::PeerAddr>().get()
|
inflight.insert(io.get_ref());
|
||||||
);
|
inflight.len()
|
||||||
|
};
|
||||||
|
|
||||||
Dispatcher::new(io, self.config.clone())
|
log::trace!(
|
||||||
|
"New http1 connection, peer address {:?}, inflight: {}",
|
||||||
|
io.query::<types::PeerAddr>().get(),
|
||||||
|
inflight
|
||||||
|
);
|
||||||
|
let ioref = io.get_ref();
|
||||||
|
|
||||||
|
let result = Dispatcher::new(io, self.config.clone())
|
||||||
.await
|
.await
|
||||||
.map_err(DispatchError::Control)
|
.map_err(DispatchError::Control);
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut inflight = self.inflight.borrow_mut();
|
||||||
|
inflight.remove(&ioref);
|
||||||
|
|
||||||
|
if inflight.len() == 0 {
|
||||||
|
if let Some(tx) = self.tx.take() {
|
||||||
|
let _ = tx.send(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
use std::{cell::RefCell, error::Error, fmt, future::poll_fn, io, marker, mem, rc::Rc};
|
use std::cell::{Cell, RefCell};
|
||||||
|
use std::{error::Error, fmt, future::poll_fn, io, marker, mem, rc::Rc};
|
||||||
|
|
||||||
use ntex_h2::{self as h2, frame::StreamId, server};
|
use ntex_h2::{self as h2, frame::StreamId, server};
|
||||||
|
|
||||||
|
use crate::channel::oneshot;
|
||||||
use crate::http::body::{BodySize, MessageBody};
|
use crate::http::body::{BodySize, MessageBody};
|
||||||
use crate::http::config::{DispatcherConfig, ServiceConfig};
|
use crate::http::config::{DispatcherConfig, ServiceConfig};
|
||||||
use crate::http::error::{DispatchError, H2Error, ResponseError};
|
use crate::http::error::{DispatchError, H2Error, ResponseError};
|
||||||
|
@ -10,7 +12,7 @@ use crate::http::message::{CurrentIo, ResponseHead};
|
||||||
use crate::http::{DateService, Method, Request, Response, StatusCode, Uri, Version};
|
use crate::http::{DateService, Method, Request, Response, StatusCode, Uri, Version};
|
||||||
use crate::io::{types, Filter, Io, IoBoxed, IoRef};
|
use crate::io::{types, Filter, Io, IoBoxed, IoRef};
|
||||||
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
|
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
|
||||||
use crate::util::{Bytes, BytesMut, HashMap};
|
use crate::util::{Bytes, BytesMut, HashMap, HashSet};
|
||||||
|
|
||||||
use super::payload::{Payload, PayloadSender};
|
use super::payload::{Payload, PayloadSender};
|
||||||
use super::DefaultControlService;
|
use super::DefaultControlService;
|
||||||
|
@ -177,11 +179,16 @@ where
|
||||||
.create(())
|
.create(())
|
||||||
.await
|
.await
|
||||||
.map_err(|e| log::error!("Cannot construct publish service: {:?}", e))?;
|
.map_err(|e| log::error!("Cannot construct publish service: {:?}", e))?;
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
let config = Rc::new(DispatcherConfig::new(self.cfg.clone(), service, ()));
|
let config = Rc::new(DispatcherConfig::new(self.cfg.clone(), service, ()));
|
||||||
|
|
||||||
Ok(H2ServiceHandler {
|
Ok(H2ServiceHandler {
|
||||||
config,
|
config,
|
||||||
control: self.ctl.clone(),
|
control: self.ctl.clone(),
|
||||||
|
inflight: RefCell::new(Default::default()),
|
||||||
|
rx: Cell::new(Some(rx)),
|
||||||
|
tx: Cell::new(Some(tx)),
|
||||||
_t: marker::PhantomData,
|
_t: marker::PhantomData,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -191,6 +198,9 @@ where
|
||||||
pub struct H2ServiceHandler<F, S: Service<Request>, B, C> {
|
pub struct H2ServiceHandler<F, S: Service<Request>, B, C> {
|
||||||
config: Rc<DispatcherConfig<S, ()>>,
|
config: Rc<DispatcherConfig<S, ()>>,
|
||||||
control: Rc<C>,
|
control: Rc<C>,
|
||||||
|
inflight: RefCell<HashSet<IoRef>>,
|
||||||
|
rx: Cell<Option<oneshot::Receiver<()>>>,
|
||||||
|
tx: Cell<Option<oneshot::Sender<()>>>,
|
||||||
_t: marker::PhantomData<(F, B)>,
|
_t: marker::PhantomData<(F, B)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -218,6 +228,25 @@ where
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn shutdown(&self) {
|
async fn shutdown(&self) {
|
||||||
|
self.config.shutdown();
|
||||||
|
|
||||||
|
// check inflight connections
|
||||||
|
let inflight = {
|
||||||
|
let inflight = self.inflight.borrow();
|
||||||
|
for io in inflight.iter() {
|
||||||
|
io.notify_dispatcher();
|
||||||
|
}
|
||||||
|
inflight.len()
|
||||||
|
};
|
||||||
|
if inflight != 0 {
|
||||||
|
log::trace!("Shutting down service, in-flight connections: {}", inflight);
|
||||||
|
if let Some(rx) = self.rx.take() {
|
||||||
|
let _ = rx.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
log::trace!("Shutting down is complected",);
|
||||||
|
}
|
||||||
|
|
||||||
self.config.service.shutdown().await
|
self.config.service.shutdown().await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,9 +255,16 @@ where
|
||||||
io: Io<F>,
|
io: Io<F>,
|
||||||
_: ServiceCtx<'_, Self>,
|
_: ServiceCtx<'_, Self>,
|
||||||
) -> Result<Self::Response, Self::Error> {
|
) -> Result<Self::Response, Self::Error> {
|
||||||
|
let inflight = {
|
||||||
|
let mut inflight = self.inflight.borrow_mut();
|
||||||
|
inflight.insert(io.get_ref());
|
||||||
|
inflight.len()
|
||||||
|
};
|
||||||
|
|
||||||
log::trace!(
|
log::trace!(
|
||||||
"New http2 connection, peer address {:?}",
|
"New http2 connection, peer address {:?}, inflight: {}",
|
||||||
io.query::<types::PeerAddr>().get()
|
io.query::<types::PeerAddr>().get(),
|
||||||
|
inflight
|
||||||
);
|
);
|
||||||
let control = self.control.create(()).await.map_err(|e| {
|
let control = self.control.create(()).await.map_err(|e| {
|
||||||
DispatchError::Control(
|
DispatchError::Control(
|
||||||
|
@ -236,7 +272,20 @@ where
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
handle(io.into(), control, self.config.clone()).await
|
let ioref = io.get_ref();
|
||||||
|
let result = handle(io.into(), control, self.config.clone()).await;
|
||||||
|
{
|
||||||
|
let mut inflight = self.inflight.borrow_mut();
|
||||||
|
inflight.remove(&ioref);
|
||||||
|
|
||||||
|
if inflight.len() == 0 {
|
||||||
|
if let Some(tx) = self.tx.take() {
|
||||||
|
let _ = tx.send(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
use std::{error, fmt, marker, rc::Rc};
|
use std::{cell::Cell, cell::RefCell, error, fmt, marker, rc::Rc};
|
||||||
|
|
||||||
use crate::io::{types, Filter, Io};
|
use crate::io::{types, Filter, Io, IoRef};
|
||||||
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
|
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
|
||||||
use crate::util::join;
|
use crate::{channel::oneshot, util::join, util::HashSet};
|
||||||
|
|
||||||
use super::body::MessageBody;
|
use super::body::MessageBody;
|
||||||
use super::builder::HttpServiceBuilder;
|
use super::builder::HttpServiceBuilder;
|
||||||
|
@ -257,11 +257,15 @@ where
|
||||||
.await
|
.await
|
||||||
.map_err(|e| log::error!("Cannot construct control service: {:?}", e))?;
|
.map_err(|e| log::error!("Cannot construct control service: {:?}", e))?;
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
let config = DispatcherConfig::new(self.cfg.clone(), service, control);
|
let config = DispatcherConfig::new(self.cfg.clone(), service, control);
|
||||||
|
|
||||||
Ok(HttpServiceHandler {
|
Ok(HttpServiceHandler {
|
||||||
config: Rc::new(config),
|
config: Rc::new(config),
|
||||||
h2_control: self.h2_control.clone(),
|
h2_control: self.h2_control.clone(),
|
||||||
|
inflight: RefCell::new(HashSet::default()),
|
||||||
|
rx: Cell::new(Some(rx)),
|
||||||
|
tx: Cell::new(Some(tx)),
|
||||||
_t: marker::PhantomData,
|
_t: marker::PhantomData,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -271,6 +275,9 @@ where
|
||||||
pub struct HttpServiceHandler<F, S, B, C1, C2> {
|
pub struct HttpServiceHandler<F, S, B, C1, C2> {
|
||||||
config: Rc<DispatcherConfig<S, C1>>,
|
config: Rc<DispatcherConfig<S, C1>>,
|
||||||
h2_control: Rc<C2>,
|
h2_control: Rc<C2>,
|
||||||
|
inflight: RefCell<HashSet<IoRef>>,
|
||||||
|
rx: Cell<Option<oneshot::Receiver<()>>>,
|
||||||
|
tx: Cell<Option<oneshot::Sender<()>>>,
|
||||||
_t: marker::PhantomData<(F, B)>,
|
_t: marker::PhantomData<(F, B)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -306,8 +313,31 @@ where
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn shutdown(&self) {
|
async fn shutdown(&self) {
|
||||||
self.config.control.shutdown().await;
|
self.config.shutdown();
|
||||||
self.config.service.shutdown().await;
|
|
||||||
|
// check inflight connections
|
||||||
|
let inflight = {
|
||||||
|
let inflight = self.inflight.borrow();
|
||||||
|
for io in inflight.iter() {
|
||||||
|
io.notify_dispatcher();
|
||||||
|
}
|
||||||
|
inflight.len()
|
||||||
|
};
|
||||||
|
if inflight != 0 {
|
||||||
|
log::trace!("Shutting down service, in-flight connections: {}", inflight);
|
||||||
|
|
||||||
|
if let Some(rx) = self.rx.take() {
|
||||||
|
let _ = rx.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
log::trace!("Shutting down is complected",);
|
||||||
|
}
|
||||||
|
|
||||||
|
join(
|
||||||
|
self.config.control.shutdown(),
|
||||||
|
self.config.service.shutdown(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn call(
|
async fn call(
|
||||||
|
@ -315,12 +345,22 @@ where
|
||||||
io: Io<F>,
|
io: Io<F>,
|
||||||
_: ServiceCtx<'_, Self>,
|
_: ServiceCtx<'_, Self>,
|
||||||
) -> Result<Self::Response, Self::Error> {
|
) -> Result<Self::Response, Self::Error> {
|
||||||
log::trace!(
|
let inflight = {
|
||||||
"New http connection, peer address {:?}",
|
let mut inflight = self.inflight.borrow_mut();
|
||||||
io.query::<types::PeerAddr>().get()
|
inflight.insert(io.get_ref());
|
||||||
);
|
inflight.len()
|
||||||
|
};
|
||||||
|
|
||||||
if io.query::<types::HttpProtocol>().get() == Some(types::HttpProtocol::Http2) {
|
log::trace!(
|
||||||
|
"New http connection, peer address {:?}, in-flight: {}",
|
||||||
|
io.query::<types::PeerAddr>().get(),
|
||||||
|
inflight
|
||||||
|
);
|
||||||
|
let ioref = io.get_ref();
|
||||||
|
|
||||||
|
let result = if io.query::<types::HttpProtocol>().get()
|
||||||
|
== Some(types::HttpProtocol::Http2)
|
||||||
|
{
|
||||||
let control = self.h2_control.create(()).await.map_err(|e| {
|
let control = self.h2_control.create(()).await.map_err(|e| {
|
||||||
DispatchError::Control(
|
DispatchError::Control(
|
||||||
format!("Cannot construct control service: {:?}", e).into(),
|
format!("Cannot construct control service: {:?}", e).into(),
|
||||||
|
@ -331,6 +371,19 @@ where
|
||||||
h1::Dispatcher::new(io, self.config.clone())
|
h1::Dispatcher::new(io, self.config.clone())
|
||||||
.await
|
.await
|
||||||
.map_err(DispatchError::Control)
|
.map_err(DispatchError::Control)
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut inflight = self.inflight.borrow_mut();
|
||||||
|
inflight.remove(&ioref);
|
||||||
|
|
||||||
|
if inflight.len() == 0 {
|
||||||
|
if let Some(tx) = self.tx.take() {
|
||||||
|
let _ = tx.send(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ use coo_kie::{Cookie, CookieJar};
|
||||||
#[cfg(feature = "ws")]
|
#[cfg(feature = "ws")]
|
||||||
use crate::io::Filter;
|
use crate::io::Filter;
|
||||||
use crate::io::Io;
|
use crate::io::Io;
|
||||||
|
use crate::server::Server;
|
||||||
#[cfg(feature = "ws")]
|
#[cfg(feature = "ws")]
|
||||||
use crate::ws::{error::WsClientError, WsClient, WsConnection};
|
use crate::ws::{error::WsClientError, WsClient, WsConnection};
|
||||||
use crate::{rt::System, service::ServiceFactory};
|
use crate::{rt::System, service::ServiceFactory};
|
||||||
|
@ -237,18 +238,18 @@ where
|
||||||
|
|
||||||
let system = sys.system();
|
let system = sys.system();
|
||||||
sys.run(move || {
|
sys.run(move || {
|
||||||
crate::server::build()
|
let srv = crate::server::build()
|
||||||
.listen("test", tcp, move |_| factory())?
|
.listen("test", tcp, move |_| factory())?
|
||||||
.set_tag("test", "HTTP-TEST-SRV")
|
.set_tag("test", "HTTP-TEST-SRV")
|
||||||
.workers(1)
|
.workers(1)
|
||||||
.disable_signals()
|
.disable_signals()
|
||||||
.run();
|
.run();
|
||||||
tx.send((system, local_addr)).unwrap();
|
tx.send((system, srv, local_addr)).unwrap();
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
let (system, addr) = rx.recv().unwrap();
|
let (system, server, addr) = rx.recv().unwrap();
|
||||||
|
|
||||||
let client = {
|
let client = {
|
||||||
let connector = {
|
let connector = {
|
||||||
|
@ -286,6 +287,7 @@ where
|
||||||
addr,
|
addr,
|
||||||
client,
|
client,
|
||||||
system,
|
system,
|
||||||
|
server,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -295,6 +297,7 @@ pub struct TestServer {
|
||||||
addr: net::SocketAddr,
|
addr: net::SocketAddr,
|
||||||
client: Client,
|
client: Client,
|
||||||
system: System,
|
system: System,
|
||||||
|
server: Server,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TestServer {
|
impl TestServer {
|
||||||
|
@ -402,13 +405,14 @@ impl TestServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stop http server
|
/// Stop http server
|
||||||
fn stop(&mut self) {
|
pub async fn stop(&self) {
|
||||||
|
self.server.stop(true).await;
|
||||||
self.system.stop();
|
self.system.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for TestServer {
|
impl Drop for TestServer {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.stop()
|
self.system.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ use ntex::http::{body, h1, HttpService, Method, Request, Response, StatusCode, V
|
||||||
use ntex::service::{fn_service, ServiceFactory};
|
use ntex::service::{fn_service, ServiceFactory};
|
||||||
use ntex::time::{sleep, timeout, Millis, Seconds};
|
use ntex::time::{sleep, timeout, Millis, Seconds};
|
||||||
use ntex::util::{Bytes, BytesMut, Ready};
|
use ntex::util::{Bytes, BytesMut, Ready};
|
||||||
use ntex::{web::error::InternalError, ws, ws::handshake_response};
|
use ntex::{channel::oneshot, rt, web::error::InternalError, ws, ws::handshake_response};
|
||||||
|
|
||||||
async fn load_body<S>(stream: S) -> Result<BytesMut, PayloadError>
|
async fn load_body<S>(stream: S) -> Result<BytesMut, PayloadError>
|
||||||
where
|
where
|
||||||
|
@ -534,3 +534,50 @@ async fn test_ws_transport() {
|
||||||
}))
|
}))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[ntex::test]
|
||||||
|
async fn test_h2_graceful_shutdown() -> io::Result<()> {
|
||||||
|
let count = Arc::new(AtomicUsize::new(0));
|
||||||
|
let count2 = count.clone();
|
||||||
|
|
||||||
|
let srv = test_server(move || {
|
||||||
|
let count = count2.clone();
|
||||||
|
HttpService::build()
|
||||||
|
.h2(move |_| {
|
||||||
|
let count = count.clone();
|
||||||
|
count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
async move {
|
||||||
|
sleep(Millis(1000)).await;
|
||||||
|
count.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
Ok::<_, io::Error>(Response::Ok().finish())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.openssl(ssl_acceptor())
|
||||||
|
.map_err(|_| ())
|
||||||
|
});
|
||||||
|
|
||||||
|
let req = srv.srequest(Method::GET, "/");
|
||||||
|
rt::spawn(async move {
|
||||||
|
let _ = req.send().await.unwrap();
|
||||||
|
sleep(Millis(100000)).await;
|
||||||
|
});
|
||||||
|
let req = srv.srequest(Method::GET, "/");
|
||||||
|
rt::spawn(async move {
|
||||||
|
let _ = req.send().await.unwrap();
|
||||||
|
sleep(Millis(100000)).await;
|
||||||
|
});
|
||||||
|
sleep(Millis(150)).await;
|
||||||
|
assert_eq!(count.load(Ordering::Relaxed), 2);
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
rt::spawn(async move {
|
||||||
|
srv.stop().await;
|
||||||
|
let _ = tx.send(());
|
||||||
|
});
|
||||||
|
sleep(Millis(150)).await;
|
||||||
|
assert_eq!(count.load(Ordering::Relaxed), 2);
|
||||||
|
|
||||||
|
let _ = rx.await;
|
||||||
|
assert_eq!(count.load(Ordering::Relaxed), 0);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
|
@ -9,7 +9,9 @@ use ntex::http::header::{self, HeaderName, HeaderValue};
|
||||||
use ntex::http::{body, h1::Control, test::server as test_server};
|
use ntex::http::{body, h1::Control, test::server as test_server};
|
||||||
use ntex::http::{HttpService, KeepAlive, Method, Request, Response, StatusCode, Version};
|
use ntex::http::{HttpService, KeepAlive, Method, Request, Response, StatusCode, Version};
|
||||||
use ntex::time::{sleep, timeout, Millis, Seconds};
|
use ntex::time::{sleep, timeout, Millis, Seconds};
|
||||||
use ntex::{service::fn_service, util::Bytes, util::Ready, web::error};
|
use ntex::{
|
||||||
|
channel::oneshot, rt, service::fn_service, util::Bytes, util::Ready, web::error,
|
||||||
|
};
|
||||||
|
|
||||||
#[ntex::test]
|
#[ntex::test]
|
||||||
async fn test_h1() {
|
async fn test_h1() {
|
||||||
|
@ -724,3 +726,81 @@ async fn test_h1_client_drop() -> io::Result<()> {
|
||||||
assert_eq!(count.load(Ordering::Relaxed), 1);
|
assert_eq!(count.load(Ordering::Relaxed), 1);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[ntex::test]
|
||||||
|
async fn test_h1_gracefull_shutdown() {
|
||||||
|
let count = Arc::new(AtomicUsize::new(0));
|
||||||
|
let count2 = count.clone();
|
||||||
|
|
||||||
|
let srv = test_server(move || {
|
||||||
|
let count = count2.clone();
|
||||||
|
HttpService::build().h1(move |_: Request| {
|
||||||
|
let count = count.clone();
|
||||||
|
count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
async move {
|
||||||
|
sleep(Millis(1000)).await;
|
||||||
|
count.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
Ok::<_, io::Error>(Response::Ok().finish())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut stream1 = net::TcpStream::connect(srv.addr()).unwrap();
|
||||||
|
let _ = stream1.write_all(b"GET /index.html HTTP/1.1\r\n\r\n");
|
||||||
|
|
||||||
|
let mut stream2 = net::TcpStream::connect(srv.addr()).unwrap();
|
||||||
|
let _ = stream2.write_all(b"GET /index.html HTTP/1.1\r\n\r\n");
|
||||||
|
|
||||||
|
sleep(Millis(150)).await;
|
||||||
|
assert_eq!(count.load(Ordering::Relaxed), 2);
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
rt::spawn(async move {
|
||||||
|
srv.stop().await;
|
||||||
|
let _ = tx.send(());
|
||||||
|
});
|
||||||
|
sleep(Millis(150)).await;
|
||||||
|
assert_eq!(count.load(Ordering::Relaxed), 2);
|
||||||
|
|
||||||
|
let _ = rx.await;
|
||||||
|
assert_eq!(count.load(Ordering::Relaxed), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[ntex::test]
|
||||||
|
async fn test_h1_gracefull_shutdown_2() {
|
||||||
|
let count = Arc::new(AtomicUsize::new(0));
|
||||||
|
let count2 = count.clone();
|
||||||
|
|
||||||
|
let srv = test_server(move || {
|
||||||
|
let count = count2.clone();
|
||||||
|
HttpService::build().finish(move |_: Request| {
|
||||||
|
let count = count.clone();
|
||||||
|
count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
async move {
|
||||||
|
sleep(Millis(1000)).await;
|
||||||
|
count.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
Ok::<_, io::Error>(Response::Ok().finish())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut stream1 = net::TcpStream::connect(srv.addr()).unwrap();
|
||||||
|
let _ = stream1.write_all(b"GET /index.html HTTP/1.1\r\n\r\n");
|
||||||
|
|
||||||
|
let mut stream2 = net::TcpStream::connect(srv.addr()).unwrap();
|
||||||
|
let _ = stream2.write_all(b"GET /index.html HTTP/1.1\r\n\r\n");
|
||||||
|
|
||||||
|
sleep(Millis(150)).await;
|
||||||
|
assert_eq!(count.load(Ordering::Relaxed), 2);
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
rt::spawn(async move {
|
||||||
|
srv.stop().await;
|
||||||
|
let _ = tx.send(());
|
||||||
|
});
|
||||||
|
sleep(Millis(150)).await;
|
||||||
|
assert_eq!(count.load(Ordering::Relaxed), 2);
|
||||||
|
|
||||||
|
let _ = rx.await;
|
||||||
|
assert_eq!(count.load(Ordering::Relaxed), 0);
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue