refine write backpressure for h1 dispatcher

This commit is contained in:
Nikolay Kim 2020-04-10 12:32:28 +06:00
parent becdc529bb
commit 95d1b4a43d
3 changed files with 140 additions and 56 deletions

View file

@ -4,7 +4,7 @@
* ntex::http: Fix handling of large http messages * ntex::http: Fix handling of large http messages
* ntex::http: Refine read back-pressure for h1 dispatcher * ntex::http: Refine read/write back-pressure for h1 dispatcher
## [0.1.6] - 2020-04-09 ## [0.1.6] - 2020-04-09

View file

@ -24,11 +24,11 @@ use super::codec::Codec;
use super::payload::{Payload, PayloadSender, PayloadStatus}; use super::payload::{Payload, PayloadSender, PayloadStatus};
use super::{Message, MessageType, MAX_BUFFER_SIZE}; use super::{Message, MessageType, MAX_BUFFER_SIZE};
const READ_BUFFER_SIZE: usize = 4096;
const READ_LW_BUFFER_SIZE: usize = 1024; const READ_LW_BUFFER_SIZE: usize = 1024;
const WRITE_BUFFER_SIZE: usize = 8192; const READ_HW_BUFFER_SIZE: usize = 4096;
const WRITE_LW_BUFFER_SIZE: usize = 2048; const WRITE_LW_BUFFER_SIZE: usize = 2048;
const HW_BUFFER_SIZE: usize = 32_768; const WRITE_HW_BUFFER_SIZE: usize = 8192;
const BUFFER_SIZE: usize = 32_768;
const LW_PIPELINED_MESSAGES: usize = 1; const LW_PIPELINED_MESSAGES: usize = 1;
bitflags! { bitflags! {
@ -175,7 +175,7 @@ where
config, config,
stream, stream,
codec, codec,
BytesMut::with_capacity(READ_BUFFER_SIZE), BytesMut::with_capacity(READ_HW_BUFFER_SIZE),
timeout, timeout,
peer_addr, peer_addr,
on_connect, on_connect,
@ -212,7 +212,7 @@ where
call: CallState::Io, call: CallState::Io,
upgrade: None, upgrade: None,
inner: InnerDispatcher { inner: InnerDispatcher {
write_buf: BytesMut::with_capacity(WRITE_BUFFER_SIZE), write_buf: BytesMut::with_capacity(WRITE_HW_BUFFER_SIZE),
payload: None, payload: None,
send_payload: None, send_payload: None,
error: None, error: None,
@ -565,14 +565,15 @@ where
let mut flushed = false; let mut flushed = false;
while let Some(ref mut stream) = self.send_payload { while let Some(ref mut stream) = self.send_payload {
// resize write buffer
let len = self.write_buf.len(); let len = self.write_buf.len();
if len < BUFFER_SIZE {
// increase write buffer
let remaining = self.write_buf.capacity() - len; let remaining = self.write_buf.capacity() - len;
if remaining < WRITE_LW_BUFFER_SIZE { if remaining < WRITE_LW_BUFFER_SIZE {
self.write_buf.reserve(HW_BUFFER_SIZE - remaining); self.write_buf.reserve(BUFFER_SIZE - remaining);
} }
if len < HW_BUFFER_SIZE {
match stream.poll_next_chunk(cx) { match stream.poll_next_chunk(cx) {
Poll::Ready(Some(Ok(item))) => { Poll::Ready(Some(Ok(item))) => {
flushed = false; flushed = false;
@ -604,7 +605,7 @@ where
// space in buffer // space in buffer
flushed = true; flushed = true;
self.poll_flush(cx)?; self.poll_flush(cx)?;
if self.write_buf.len() >= HW_BUFFER_SIZE { if self.write_buf.len() >= BUFFER_SIZE {
return Ok(PollWrite::Pending); return Ok(PollWrite::Pending);
} }
} }
@ -615,7 +616,7 @@ where
} }
// we have enought space in write bffer // we have enought space in write bffer
if self.write_buf.len() < HW_BUFFER_SIZE { if self.write_buf.len() < BUFFER_SIZE {
Ok(PollWrite::AllowNext) Ok(PollWrite::AllowNext)
} else { } else {
Ok(PollWrite::Pending) Ok(PollWrite::Pending)
@ -647,9 +648,10 @@ where
let buf = &mut self.read_buf; let buf = &mut self.read_buf;
let mut updated = false; let mut updated = false;
while buf.len() < MAX_BUFFER_SIZE { while buf.len() < MAX_BUFFER_SIZE {
// increase read buffer size
let remaining = buf.capacity() - buf.len(); let remaining = buf.capacity() - buf.len();
if remaining < READ_LW_BUFFER_SIZE { if remaining < READ_LW_BUFFER_SIZE {
buf.reserve(HW_BUFFER_SIZE - remaining); buf.reserve(BUFFER_SIZE);
} }
match read(cx, io, buf) { match read(cx, io, buf) {
@ -915,7 +917,9 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use bytes::Bytes;
use futures::future::{lazy, ok, Future, FutureExt}; use futures::future::{lazy, ok, Future, FutureExt};
use futures::StreamExt;
use rand::Rng; use rand::Rng;
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@ -925,7 +929,7 @@ mod tests {
use super::*; use super::*;
use crate::http::config::{DispatcherConfig, ServiceConfig}; use crate::http::config::{DispatcherConfig, ServiceConfig};
use crate::http::h1::{ClientCodec, ExpectHandler, UpgradeHandler}; use crate::http::h1::{ClientCodec, ExpectHandler, UpgradeHandler};
use crate::http::{ResponseHead, StatusCode}; use crate::http::{body, Request, ResponseHead, StatusCode};
use crate::rt::time::delay_for; use crate::rt::time::delay_for;
use crate::service::IntoService; use crate::service::IntoService;
use crate::testing::Io; use crate::testing::Io;
@ -1119,9 +1123,14 @@ mod tests {
let (client, server) = Io::create(); let (client, server) = Io::create();
client.remote_buffer_cap(4096); client.remote_buffer_cap(4096);
spawn_h1(server, move |_| { spawn_h1(server, move |mut req: Request| {
mark2.store(true, Ordering::Relaxed); let m = mark2.clone();
async { async move {
// read one chunk
let mut pl = req.take_payload();
let _ = pl.next().await.unwrap().unwrap();
m.store(true, Ordering::Relaxed);
// sleep
delay_for(Duration::from_secs(999999)).await; delay_for(Duration::from_secs(999999)).await;
Ok::<_, io::Error>(Response::Ok().finish()) Ok::<_, io::Error>(Response::Ok().finish())
} }
@ -1129,7 +1138,6 @@ mod tests {
client.write("GET /test HTTP/1.1\r\nContent-Length: 1048576\r\n\r\n"); client.write("GET /test HTTP/1.1\r\nContent-Length: 1048576\r\n\r\n");
delay_for(Duration::from_millis(50)).await; delay_for(Duration::from_millis(50)).await;
assert!(mark.load(Ordering::Relaxed));
// buf must be consumed // buf must be consumed
assert_eq!(client.remote_buffer(|buf| buf.len()), 0); assert_eq!(client.remote_buffer(|buf| buf.len()), 0);
@ -1139,6 +1147,107 @@ mod tests {
client.write(random_bytes); client.write(random_bytes);
delay_for(Duration::from_millis(50)).await; delay_for(Duration::from_millis(50)).await;
assert!(client.remote_buffer(|buf| buf.len()) > 1048576 - MAX_BUFFER_SIZE * 2); assert!(client.remote_buffer(|buf| buf.len()) > 1048576 - MAX_BUFFER_SIZE * 3);
assert!(mark.load(Ordering::Relaxed));
}
#[ntex_rt::test]
async fn test_write_backpressure() {
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
struct Stream(Arc<AtomicUsize>);
impl body::MessageBody for Stream {
fn size(&self) -> body::BodySize {
body::BodySize::Stream
}
fn poll_next_chunk(
&mut self,
_: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Box<dyn std::error::Error>>>> {
let data = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(65_536)
.collect::<String>();
self.0.fetch_add(data.len(), Ordering::Relaxed);
Poll::Ready(Some(Ok(Bytes::from(data))))
}
}
let (client, server) = Io::create();
let mut h1 = h1(server, move |_| {
let n = num2.clone();
async move { Ok::<_, io::Error>(Response::Ok().message_body(Stream(n.clone()))) }
.boxed_local()
});
// do not allow to write to socket
client.remote_buffer_cap(0);
client.write("GET /test HTTP/1.1\r\n\r\n");
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
// buf must be consumed
assert_eq!(client.remote_buffer(|buf| buf.len()), 0);
// amount of generated data
assert_eq!(num.load(Ordering::Relaxed), 65_536);
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
assert_eq!(num.load(Ordering::Relaxed), 65_536);
// response message + chunking encoding
assert_eq!(h1.inner.write_buf.len(), 65629);
client.remote_buffer_cap(65536);
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
assert_eq!(num.load(Ordering::Relaxed), 65_536 * 2);
}
#[ntex_rt::test]
async fn test_disconnect_during_response_body_pending() {
struct Stream(bool);
impl body::MessageBody for Stream {
fn size(&self) -> body::BodySize {
body::BodySize::Sized(2048)
}
fn poll_next_chunk(
&mut self,
_: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Box<dyn std::error::Error>>>> {
if self.0 {
Poll::Pending
} else {
self.0 = true;
let data = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(1024)
.collect::<String>();
Poll::Ready(Some(Ok(Bytes::from(data))))
}
}
}
let (client, server) = Io::create();
client.remote_buffer_cap(4096);
let mut h1 = h1(server, |_| {
ok::<_, io::Error>(Response::Ok().message_body(Stream(false)))
});
client.write("GET /test HTTP/1.1\r\n\r\n");
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
// buf must be consumed
assert_eq!(client.remote_buffer(|buf| buf.len()), 0);
let mut decoder = ClientCodec::default();
let mut buf = client.read().await.unwrap();
assert!(load(&mut decoder, &mut buf).status.is_success());
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
client.close().await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
} }
} }

View file

@ -12,16 +12,16 @@ use crate::http::error::PayloadError;
use crate::task::LocalWaker; use crate::task::LocalWaker;
/// max buffer size 32k /// max buffer size 32k
pub(crate) const MAX_BUFFER_SIZE: usize = 32_768; const MAX_BUFFER_SIZE: usize = 32_768;
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum PayloadStatus { pub(super) enum PayloadStatus {
Read, Read,
Pause, Pause,
Dropped, Dropped,
} }
/// Buffered stream of bytes chunks /// Buffered stream of byte chunks
/// ///
/// Payload stores chunks in a vector. First chunk can be received with /// Payload stores chunks in a vector. First chunk can be received with
/// `.readany()` method. Payload stream is not thread safe. Payload does not /// `.readany()` method. Payload stream is not thread safe. Payload does not
@ -42,7 +42,7 @@ impl Payload {
/// * `PayloadSender` - *Sender* side of the stream /// * `PayloadSender` - *Sender* side of the stream
/// ///
/// * `Payload` - *Receiver* side of the stream /// * `Payload` - *Receiver* side of the stream
pub fn create(eof: bool) -> (PayloadSender, Payload) { pub(super) fn create(eof: bool) -> (PayloadSender, Payload) {
let shared = Rc::new(RefCell::new(Inner::new(eof))); let shared = Rc::new(RefCell::new(Inner::new(eof)));
( (
@ -61,18 +61,6 @@ impl Payload {
} }
} }
/// Length of the data in this payload
#[cfg(test)]
pub fn len(&self) -> usize {
self.inner.borrow().len()
}
/// Is payload empty
#[cfg(test)]
pub fn is_empty(&self) -> bool {
self.inner.borrow().len() == 0
}
/// Put unused data back to payload /// Put unused data back to payload
#[inline] #[inline]
pub fn unread_data(&mut self, data: Bytes) { pub fn unread_data(&mut self, data: Bytes) {
@ -100,34 +88,30 @@ impl Stream for Payload {
} }
/// Sender part of the payload stream /// Sender part of the payload stream
pub struct PayloadSender { pub(super) struct PayloadSender {
inner: Weak<RefCell<Inner>>, inner: Weak<RefCell<Inner>>,
} }
impl PayloadSender { impl PayloadSender {
#[inline] pub(super) fn set_error(&mut self, err: PayloadError) {
pub fn set_error(&mut self, err: PayloadError) {
if let Some(shared) = self.inner.upgrade() { if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().set_error(err) shared.borrow_mut().set_error(err)
} }
} }
#[inline] pub(super) fn feed_eof(&mut self) {
pub fn feed_eof(&mut self) {
if let Some(shared) = self.inner.upgrade() { if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_eof() shared.borrow_mut().feed_eof()
} }
} }
#[inline] pub(super) fn feed_data(&mut self, data: Bytes) {
pub fn feed_data(&mut self, data: Bytes) {
if let Some(shared) = self.inner.upgrade() { if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_data(data) shared.borrow_mut().feed_data(data)
} }
} }
#[inline] pub(super) fn need_read(&self, cx: &mut Context<'_>) -> PayloadStatus {
pub fn need_read(&self, cx: &mut Context<'_>) -> PayloadStatus {
// we check need_read only if Payload (other side) is alive, // we check need_read only if Payload (other side) is alive,
// otherwise always return true (consume payload) // otherwise always return true (consume payload)
if let Some(shared) = self.inner.upgrade() { if let Some(shared) = self.inner.upgrade() {
@ -167,17 +151,14 @@ impl Inner {
} }
} }
#[inline]
fn set_error(&mut self, err: PayloadError) { fn set_error(&mut self, err: PayloadError) {
self.err = Some(err); self.err = Some(err);
} }
#[inline]
fn feed_eof(&mut self) { fn feed_eof(&mut self) {
self.eof = true; self.eof = true;
} }
#[inline]
fn feed_data(&mut self, data: Bytes) { fn feed_data(&mut self, data: Bytes) {
self.len += data.len(); self.len += data.len();
self.items.push_back(data); self.items.push_back(data);
@ -187,11 +168,6 @@ impl Inner {
} }
} }
#[cfg(test)]
fn len(&self) -> usize {
self.len
}
fn readany( fn readany(
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -233,8 +209,7 @@ mod tests {
let (_, mut payload) = Payload::create(false); let (_, mut payload) = Payload::create(false);
payload.unread_data(Bytes::from("data")); payload.unread_data(Bytes::from("data"));
assert!(!payload.is_empty()); assert_eq!(payload.inner.borrow().len, 4);
assert_eq!(payload.len(), 4);
assert_eq!( assert_eq!(
Bytes::from("data"), Bytes::from("data"),