refactor framed dispatcher write back-pressure support

This commit is contained in:
Nikolay Kim 2021-02-22 16:51:38 +06:00
parent e4483569b8
commit 92dacafe06
5 changed files with 179 additions and 109 deletions

View file

@ -6,6 +6,8 @@
* http: Add ClientResponse::header() method
* framed: Refactor write back-pressure support
## [0.2.0] - 2021-02-21
* 0.2 release

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.2.0"
version = "0.2.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"

View file

@ -57,8 +57,7 @@ where
#[derive(Copy, Clone, Debug)]
enum DispatcherState {
Processing,
WrEnabled,
WrWaitReady,
Backpressure,
Stop,
Shutdown,
}
@ -216,78 +215,20 @@ where
Poll::Ready(item) => {
this.fut.set(None);
slf.shared.inflight.set(slf.shared.inflight.get() - 1);
let _ = slf.handle_result(item, cx);
let _ = slf.handle_result(item);
}
}
}
loop {
match slf.st.get() {
DispatcherState::WrEnabled => {
let item = match ready!(slf.poll_service(&this.service, cx)) {
PollService::Ready => {
slf.st.set(DispatcherState::WrWaitReady);
DispatchItem::WBackPressureEnabled
}
PollService::Item(item) => item,
PollService::ServiceError => continue,
};
// call service
if this.fut.is_none() {
// optimize first service call
this.fut.set(Some(this.service.call(item)));
match this.fut.as_mut().as_pin_mut().unwrap().poll(cx) {
Poll::Ready(res) => {
this.fut.set(None);
ready!(slf.handle_result(res, cx));
}
Poll::Pending => {
slf.shared.inflight.set(slf.shared.inflight.get() + 1)
}
}
} else {
slf.spawn_service_call(this.service.call(item));
}
}
DispatcherState::WrWaitReady => {
let item = match ready!(slf.poll_service(&this.service, cx)) {
PollService::Ready => {
if state.is_write_backpressure_disabled() {
slf.st.set(DispatcherState::Processing);
DispatchItem::WBackPressureDisabled
} else {
return Poll::Pending;
}
}
PollService::Item(item) => item,
PollService::ServiceError => continue,
};
// call service
if this.fut.is_none() {
// optimize first service call
this.fut.set(Some(this.service.call(item)));
match this.fut.as_mut().as_pin_mut().unwrap().poll(cx) {
Poll::Ready(res) => {
this.fut.set(None);
ready!(slf.handle_result(res, cx));
}
Poll::Pending => {
slf.shared.inflight.set(slf.shared.inflight.get() + 1)
}
}
} else {
slf.spawn_service_call(this.service.call(item));
}
}
DispatcherState::Processing => {
let item = match ready!(slf.poll_service(&this.service, cx)) {
PollService::Ready => {
if state.is_write_backpressure_enabled() {
if !state.is_write_ready() {
// instruct write task to notify dispatcher when data is flushed
state.dsp_enable_write_backpressure(cx.waker());
slf.st.set(DispatcherState::WrWaitReady);
slf.st.set(DispatcherState::Backpressure);
DispatchItem::WBackPressureEnabled
} else if state.is_read_ready() {
// decode incoming bytes if buffer is ready
@ -324,7 +265,39 @@ where
match this.fut.as_mut().as_pin_mut().unwrap().poll(cx) {
Poll::Ready(res) => {
this.fut.set(None);
ready!(slf.handle_result(res, cx));
ready!(slf.handle_result(res));
}
Poll::Pending => {
slf.shared.inflight.set(slf.shared.inflight.get() + 1)
}
}
} else {
slf.spawn_service_call(this.service.call(item));
}
}
// handle write back-pressure
DispatcherState::Backpressure => {
let item = match ready!(slf.poll_service(&this.service, cx)) {
PollService::Ready => {
if state.is_write_ready() {
slf.st.set(DispatcherState::Processing);
DispatchItem::WBackPressureDisabled
} else {
return Poll::Pending;
}
}
PollService::Item(item) => item,
PollService::ServiceError => continue,
};
// call service
if this.fut.is_none() {
// optimize first service call
this.fut.set(Some(this.service.call(item)));
match this.fut.as_mut().as_pin_mut().unwrap().poll(cx) {
Poll::Ready(res) => {
this.fut.set(None);
ready!(slf.handle_result(res));
}
Poll::Pending => {
slf.shared.inflight.set(slf.shared.inflight.get() + 1)
@ -386,14 +359,12 @@ where
fn handle_result(
&self,
item: Result<Option<<U as Encoder>::Item>, S::Error>,
cx: &mut Context<'_>,
) -> Poll<()> {
match self.state.write_result(item, &self.shared.codec) {
Ok(true) => (),
Ok(false) => {
// instruct write task to notify dispatcher when data is flushed
self.state.dsp_enable_write_backpressure(cx.waker());
self.st.set(DispatcherState::WrEnabled);
self.state.enable_write_backpressure();
return Poll::Pending;
}
Err(Either::Left(err)) => {
@ -515,6 +486,8 @@ where
mod tests {
use bytes::Bytes;
use futures::future::FutureExt;
use rand::Rng;
use std::sync::{Arc, Mutex};
use crate::codec::BytesCodec;
use crate::rt::time::delay_for;
@ -540,7 +513,7 @@ mod tests {
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
let timer = Timer::default();
let ka_timeout = 30;
let ka_timeout = 1;
let ka_updated = timer.now();
let state = State::new();
let io = Rc::new(RefCell::new(io));
@ -550,6 +523,9 @@ mod tests {
inflight: Cell::new(0),
});
let expire = ka_updated + Duration::from_millis(500);
timer.register(expire, expire, &state);
crate::rt::spawn(ReadTask::new(io.clone(), state.clone()));
crate::rt::spawn(WriteTask::new(io.clone(), state.clone()));
@ -670,4 +646,117 @@ mod tests {
client.close().await;
assert!(client.is_server_dropped());
}
#[ntex_rt::test]
async fn test_write_backpressure() {
let (client, server) = Io::create();
// do not allow to write to socket
client.remote_buffer_cap(0);
client.write("GET /test HTTP/1\r\n\r\n");
let data = Arc::new(Mutex::new(RefCell::new(Vec::new())));
let data2 = data.clone();
let (disp, state) = Dispatcher::debug(
server,
BytesCodec,
crate::fn_service(move |msg: DispatchItem<BytesCodec>| {
let data = data2.clone();
async move {
match msg {
DispatchItem::Item(_) => {
data.lock().unwrap().borrow_mut().push(0);
let bytes = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(65_536)
.map(char::from)
.collect::<String>();
return Ok::<_, ()>(Some(Bytes::from(bytes)));
}
DispatchItem::WBackPressureEnabled => {
data.lock().unwrap().borrow_mut().push(1);
}
DispatchItem::WBackPressureDisabled => {
data.lock().unwrap().borrow_mut().push(2);
}
_ => (),
}
Ok(None)
}
}),
);
crate::rt::spawn(disp.map(|_| ()));
let buf = client.read_any();
assert_eq!(buf, Bytes::from_static(b""));
client.write("GET /test HTTP/1\r\n\r\n");
delay_for(Duration::from_millis(25)).await;
// buf must be consumed
assert_eq!(client.remote_buffer(|buf| buf.len()), 0);
// response message
assert!(!state.is_write_ready());
assert_eq!(state.with_write_buf(|buf| buf.len()), 65536);
client.remote_buffer_cap(10240);
delay_for(Duration::from_millis(50)).await;
assert_eq!(state.with_write_buf(|buf| buf.len()), 55296);
client.remote_buffer_cap(45056);
delay_for(Duration::from_millis(50)).await;
assert_eq!(state.with_write_buf(|buf| buf.len()), 10240);
// backpressure disabled
assert!(state.is_write_ready());
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1, 2]);
}
#[ntex_rt::test]
async fn test_keepalive() {
env_logger::init();
let (client, server) = Io::create();
// do not allow to write to socket
client.remote_buffer_cap(1024);
client.write("GET /test HTTP/1\r\n\r\n");
let data = Arc::new(Mutex::new(RefCell::new(Vec::new())));
let data2 = data.clone();
let (disp, state) = Dispatcher::debug(
server,
BytesCodec,
crate::fn_service(move |msg: DispatchItem<BytesCodec>| {
let data = data2.clone();
async move {
match msg {
DispatchItem::Item(bytes) => {
data.lock().unwrap().borrow_mut().push(0);
return Ok::<_, ()>(Some(bytes.freeze()));
}
DispatchItem::KeepAliveTimeout => {
data.lock().unwrap().borrow_mut().push(1);
}
_ => (),
}
Ok(None)
}
}),
);
crate::rt::spawn(disp.map(|_| ()));
let state = state.disconnect_timeout(1);
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
delay_for(Duration::from_millis(3100)).await;
// write side must be closed, dispatcher should fail with keep-alive
let flags = state.flags();
assert!(state.is_io_err());
assert!(state.is_io_shutdown());
assert!(flags.contains(crate::framed::state::Flags::IO_SHUTDOWN));
assert!(client.is_closed());
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]);
}
}

View file

@ -33,12 +33,10 @@ bitflags::bitflags! {
/// read buffer is full
const RD_BUF_FULL = 0b0000_1000_0000;
/// write task is ready
const WR_READY = 0b0001_0000_0000;
/// write buffer is full
const WR_NOT_READY = 0b0010_0000_0000;
const WR_BACKPRESSURE = 0b0000_0001_0000_0000;
const ST_DSP_ERR = 0b0001_0000_0000_0000;
const ST_DSP_ERR = 0b0001_0000_0000_0000;
}
}
@ -205,40 +203,20 @@ impl State {
/// read task must be paused if service is not ready (RD_PAUSED)
pub(super) fn is_read_paused(&self) -> bool {
self.0.flags.get().intersects(Flags::RD_PAUSED)
self.0.flags.get().contains(Flags::RD_PAUSED)
}
#[inline]
/// Check if write back-pressure is disabled
pub fn is_write_backpressure_disabled(&self) -> bool {
let mut flags = self.0.flags.get();
if flags.contains(Flags::WR_READY) {
flags.remove(Flags::WR_READY);
self.0.flags.set(flags);
true
} else {
false
}
}
#[inline]
/// Check if write back-pressure is enabled
pub fn is_write_backpressure_enabled(&self) -> bool {
let mut flags = self.0.flags.get();
if flags.contains(Flags::WR_READY) {
flags.remove(Flags::WR_READY);
self.0.flags.set(flags);
true
} else {
false
}
/// Check if write task is ready
pub fn is_write_ready(&self) -> bool {
!self.0.flags.get().contains(Flags::WR_BACKPRESSURE)
}
#[inline]
/// Enable write back-persurre
pub fn enable_write_backpressure(&self) {
log::trace!("enable write back-pressure");
self.insert_flags(Flags::WR_NOT_READY)
self.insert_flags(Flags::WR_BACKPRESSURE);
}
#[inline]
@ -335,13 +313,16 @@ impl State {
self.0.read_task.register(waker);
}
pub(super) fn update_write_task(&self) {
let mut flags = self.0.flags.get();
if flags.contains(Flags::WR_NOT_READY) {
flags.remove(Flags::WR_NOT_READY);
flags.insert(Flags::WR_READY);
self.0.flags.set(flags);
self.0.dispatch_task.wake();
pub(super) fn update_write_task(&self, ready: bool) {
if ready {
let mut flags = self.0.flags.get();
if flags.contains(Flags::WR_BACKPRESSURE) {
flags.remove(Flags::WR_BACKPRESSURE);
self.0.flags.set(flags);
self.0.dispatch_task.wake();
}
} else {
self.insert_flags(Flags::WR_BACKPRESSURE);
}
}
@ -383,7 +364,7 @@ impl State {
///
/// Write task must be waken up separately.
pub fn dsp_enable_write_backpressure(&self, waker: &Waker) {
self.insert_flags(Flags::WR_NOT_READY);
self.insert_flags(Flags::WR_BACKPRESSURE);
self.0.dispatch_task.register(waker);
}

View file

@ -103,9 +103,7 @@ where
match result {
Poll::Ready(Ok(_)) | Poll::Pending => {
if len < HW {
this.state.update_write_task()
}
this.state.update_write_task(len < HW)
}
Poll::Ready(Err(err)) => {
log::trace!("error during sending data: {:?}", err);