mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-03 04:47:39 +03:00
Better write back-pressure handling (#359)
This commit is contained in:
parent
33c8900172
commit
55dcad5f27
5 changed files with 70 additions and 36 deletions
|
@ -1,5 +1,11 @@
|
|||
# Changes
|
||||
|
||||
## [1.2.0] - 2024-05-12
|
||||
|
||||
* Better write back-pressure handling
|
||||
|
||||
* Dispatcher optimization for handling first item
|
||||
|
||||
## [1.1.0] - 2024-05-01
|
||||
|
||||
* Add IoRef::notify_timeout() helper method
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "ntex-io"
|
||||
version = "1.1.0"
|
||||
version = "1.2.0"
|
||||
authors = ["ntex contributors <team@ntex.rs>"]
|
||||
description = "Utilities for encoding and decoding frames"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
//! Framed transport dispatcher
|
||||
#![allow(clippy::let_underscore_future)]
|
||||
use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll};
|
||||
use std::{cell::Cell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
|
||||
|
||||
use ntex_bytes::Pool;
|
||||
use ntex_codec::{Decoder, Encoder};
|
||||
use ntex_service::{IntoService, Pipeline, Service};
|
||||
use ntex_service::{IntoService, Pipeline, PipelineCall, Service};
|
||||
use ntex_util::{future::Either, ready, spawn, time::Seconds};
|
||||
|
||||
use crate::{Decoded, DispatchItem, IoBoxed, IoStatusUpdate, RecvError};
|
||||
|
@ -117,6 +117,7 @@ pin_project_lite::pin_project! {
|
|||
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
|
||||
U: Encoder,
|
||||
U: Decoder,
|
||||
U: 'static,
|
||||
{
|
||||
inner: DispatcherInner<S, U>,
|
||||
}
|
||||
|
@ -136,12 +137,13 @@ bitflags::bitflags! {
|
|||
struct DispatcherInner<S, U>
|
||||
where
|
||||
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
|
||||
U: Encoder + Decoder,
|
||||
U: Encoder + Decoder + 'static,
|
||||
{
|
||||
st: DispatcherState,
|
||||
error: Option<S::Error>,
|
||||
flags: Flags,
|
||||
shared: Rc<DispatcherShared<S, U>>,
|
||||
response: Option<PipelineCall<S, DispatchItem<U>>>,
|
||||
pool: Pool,
|
||||
cfg: DispatcherConfig,
|
||||
read_remains: u32,
|
||||
|
@ -193,7 +195,7 @@ impl<S, U> From<Either<S, U>> for DispatcherError<S, U> {
|
|||
impl<S, U> Dispatcher<S, U>
|
||||
where
|
||||
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
|
||||
U: Decoder + Encoder,
|
||||
U: Decoder + Encoder + 'static,
|
||||
{
|
||||
/// Construct new `Dispatcher` instance.
|
||||
pub fn new<Io, F>(
|
||||
|
@ -230,6 +232,7 @@ where
|
|||
shared,
|
||||
flags,
|
||||
cfg: cfg.clone(),
|
||||
response: None,
|
||||
error: None,
|
||||
read_remains: 0,
|
||||
read_remains_prev: 0,
|
||||
|
@ -245,8 +248,7 @@ where
|
|||
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
|
||||
U: Encoder + Decoder,
|
||||
{
|
||||
fn handle_result(&self, item: Result<S::Response, S::Error>, io: &IoBoxed) {
|
||||
self.inflight.set(self.inflight.get() - 1);
|
||||
fn handle_result(&self, item: Result<S::Response, S::Error>, io: &IoBoxed, wake: bool) {
|
||||
match item {
|
||||
Ok(Some(val)) => {
|
||||
if let Err(err) = io.encode(val, &self.codec) {
|
||||
|
@ -256,11 +258,14 @@ where
|
|||
Err(err) => self.error.set(Some(DispatcherError::Service(err))),
|
||||
Ok(None) => (),
|
||||
}
|
||||
io.wake();
|
||||
self.inflight.set(self.inflight.get() - 1);
|
||||
if wake {
|
||||
io.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, U> future::Future for Dispatcher<S, U>
|
||||
impl<S, U> Future for Dispatcher<S, U>
|
||||
where
|
||||
S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static,
|
||||
U: Decoder + Encoder + 'static,
|
||||
|
@ -271,6 +276,14 @@ where
|
|||
let mut this = self.as_mut().project();
|
||||
let slf = &mut this.inner;
|
||||
|
||||
// handle service response future
|
||||
if let Some(fut) = slf.response.as_mut() {
|
||||
if let Poll::Ready(item) = Pin::new(fut).poll(cx) {
|
||||
slf.shared.handle_result(item, &slf.shared.io, false);
|
||||
slf.response = None;
|
||||
}
|
||||
}
|
||||
|
||||
// handle memory pool pressure
|
||||
if slf.pool.poll_ready(cx).is_pending() {
|
||||
slf.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT);
|
||||
|
@ -339,37 +352,25 @@ where
|
|||
PollService::Continue => continue,
|
||||
};
|
||||
|
||||
// call service
|
||||
let shared = slf.shared.clone();
|
||||
shared.inflight.set(shared.inflight.get() + 1);
|
||||
let _ = spawn(async move {
|
||||
let result = shared.service.call(item).await;
|
||||
shared.handle_result(result, &shared.io);
|
||||
});
|
||||
slf.call_service(cx, item);
|
||||
}
|
||||
// handle write back-pressure
|
||||
DispatcherState::Backpressure => {
|
||||
let item = match ready!(slf.poll_service(cx)) {
|
||||
PollService::Ready => {
|
||||
if let Err(err) = ready!(slf.shared.io.poll_flush(cx, false)) {
|
||||
slf.st = DispatcherState::Stop;
|
||||
DispatchItem::Disconnect(Some(err))
|
||||
} else {
|
||||
slf.st = DispatcherState::Processing;
|
||||
DispatchItem::WBackPressureDisabled
|
||||
}
|
||||
}
|
||||
PollService::Item(item) => item,
|
||||
match ready!(slf.poll_service(cx)) {
|
||||
PollService::Ready => (),
|
||||
PollService::Item(item) => slf.call_service(cx, item),
|
||||
PollService::Continue => continue,
|
||||
};
|
||||
|
||||
// call service
|
||||
let shared = slf.shared.clone();
|
||||
shared.inflight.set(shared.inflight.get() + 1);
|
||||
let _ = spawn(async move {
|
||||
let result = shared.service.call(item).await;
|
||||
shared.handle_result(result, &shared.io);
|
||||
});
|
||||
let item = if let Err(err) = ready!(slf.shared.io.poll_flush(cx, false))
|
||||
{
|
||||
slf.st = DispatcherState::Stop;
|
||||
DispatchItem::Disconnect(Some(err))
|
||||
} else {
|
||||
slf.st = DispatcherState::Processing;
|
||||
DispatchItem::WBackPressureDisabled
|
||||
};
|
||||
slf.call_service(cx, item);
|
||||
}
|
||||
// drain service responses and shutdown io
|
||||
DispatcherState::Stop => {
|
||||
|
@ -432,6 +433,26 @@ where
|
|||
S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static,
|
||||
U: Decoder + Encoder + 'static,
|
||||
{
|
||||
fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem<U>) {
|
||||
let mut fut = self.shared.service.call_static(item);
|
||||
self.shared.inflight.set(self.shared.inflight.get() + 1);
|
||||
|
||||
// optimize first call
|
||||
if self.response.is_none() {
|
||||
if let Poll::Ready(result) = Pin::new(&mut fut).poll(cx) {
|
||||
self.shared.handle_result(result, &self.shared.io, false);
|
||||
} else {
|
||||
self.response = Some(fut);
|
||||
}
|
||||
} else {
|
||||
let shared = self.shared.clone();
|
||||
let _ = spawn(async move {
|
||||
let result = fut.await;
|
||||
shared.handle_result(result, &shared.io, true);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll<PollService<U>> {
|
||||
match self.shared.service.poll_ready(cx) {
|
||||
Poll::Ready(Ok(_)) => {
|
||||
|
@ -493,7 +514,10 @@ where
|
|||
self.st = DispatcherState::Stop;
|
||||
Poll::Ready(PollService::Item(DispatchItem::Disconnect(err)))
|
||||
}
|
||||
IoStatusUpdate::WriteBackpressure => Poll::Pending,
|
||||
IoStatusUpdate::WriteBackpressure => {
|
||||
self.st = DispatcherState::Backpressure;
|
||||
Poll::Ready(PollService::Item(DispatchItem::WBackPressureEnabled))
|
||||
}
|
||||
}
|
||||
}
|
||||
// handle service readiness error
|
||||
|
@ -703,6 +727,7 @@ mod tests {
|
|||
inner: DispatcherInner {
|
||||
error: None,
|
||||
st: DispatcherState::Processing,
|
||||
response: None,
|
||||
read_remains: 0,
|
||||
read_remains_prev: 0,
|
||||
read_max_timeout: Seconds::ZERO,
|
||||
|
|
|
@ -32,6 +32,9 @@ pub use self::tasks::{ReadContext, WriteContext};
|
|||
pub use self::timer::TimerHandle;
|
||||
pub use self::utils::{seal, Decoded};
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use self::io::Flags;
|
||||
|
||||
/// Status for read task
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
|
||||
pub enum ReadStatus {
|
||||
|
|
|
@ -67,7 +67,7 @@ ntex-bytes = "0.1.25"
|
|||
ntex-server = "1.0.5"
|
||||
ntex-h2 = "0.5.4"
|
||||
ntex-rt = "0.4.12"
|
||||
ntex-io = "1.1.0"
|
||||
ntex-io = "1.2.0"
|
||||
ntex-net = "1.0.1"
|
||||
ntex-tls = "1.1.0"
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue