Refine read back-pressure for h1 dispatcher

This commit is contained in:
Nikolay Kim 2020-04-10 11:14:24 +06:00
parent 46ce12f07a
commit becdc529bb
6 changed files with 103 additions and 24 deletions

View file

@ -8,10 +8,10 @@ members = [
"ntex-service",
]
[patch.crates-io]
ntex = { path = "ntex" }
ntex-codec = { path = "ntex-codec" }
ntex-router = { path = "ntex-router" }
ntex-rt = { path = "ntex-rt" }
ntex-rt-macros = { path = "ntex-rt-macros" }
ntex-service = { path = "ntex-service" }
#[patch.crates-io]
#ntex = { path = "ntex" }
#ntex-codec = { path = "ntex-codec" }
#ntex-router = { path = "ntex-router" }
#ntex-rt = { path = "ntex-rt" }
#ntex-rt-macros = { path = "ntex-rt-macros" }
#ntex-service = { path = "ntex-service" }

View file

@ -1,5 +1,11 @@
# Changes
## [0.1.7] - 2020-04-xx
* ntex::http: Fix handling of large http messages
* ntex::http: Refine read back-pressure for h1 dispatcher
## [0.1.6] - 2020-04-09
* ntex::web: Allow to add multiple services at once

View file

@ -14,7 +14,8 @@ use crate::http::header::HeaderMap;
use crate::http::message::{ConnectionType, ResponseHead};
use crate::http::request::Request;
const MAX_BUFFER_SIZE: usize = 131_072;
use super::MAX_BUFFER_SIZE;
const MAX_HEADERS: usize = 96;
/// Incoming messagd decoder
@ -209,7 +210,13 @@ impl MessageType for Request {
(len, method, uri, version, req.headers.len())
}
httparse::Status::Partial => return Ok(None),
httparse::Status::Partial => {
if src.len() >= MAX_BUFFER_SIZE {
trace!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(ParseError::TooLarge);
}
return Ok(None);
}
}
};
@ -228,9 +235,6 @@ impl MessageType for Request {
PayloadLength::None => {
if method == Method::CONNECT {
PayloadType::Stream(PayloadDecoder::eof())
} else if src.len() >= MAX_BUFFER_SIZE {
trace!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(ParseError::TooLarge);
} else {
PayloadType::None
}
@ -284,7 +288,14 @@ impl MessageType for ResponseHead {
(len, version, status, res.headers.len())
}
httparse::Status::Partial => return Ok(None),
httparse::Status::Partial => {
return if src.len() >= MAX_BUFFER_SIZE {
error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
Err(ParseError::TooLarge)
} else {
Ok(None)
}
}
}
};
@ -300,9 +311,6 @@ impl MessageType for ResponseHead {
} else if status == StatusCode::SWITCHING_PROTOCOLS {
// switching protocol or connect
PayloadType::Stream(PayloadDecoder::eof())
} else if src.len() >= MAX_BUFFER_SIZE {
error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(ParseError::TooLarge);
} else {
// for HTTP/1.0 read to eof and close connection
if msg.version == Version::HTTP_10 {

View file

@ -22,7 +22,7 @@ use crate::Service;
use super::codec::Codec;
use super::payload::{Payload, PayloadSender, PayloadStatus};
use super::{Message, MessageType};
use super::{Message, MessageType, MAX_BUFFER_SIZE};
const READ_BUFFER_SIZE: usize = 4096;
const READ_LW_BUFFER_SIZE: usize = 1024;
@ -646,7 +646,7 @@ where
let io = self.io.as_mut().unwrap();
let buf = &mut self.read_buf;
let mut updated = false;
loop {
while buf.len() < MAX_BUFFER_SIZE {
let remaining = buf.capacity() - buf.len();
if remaining < READ_LW_BUFFER_SIZE {
buf.reserve(HW_BUFFER_SIZE - remaining);
@ -916,15 +916,16 @@ where
#[cfg(test)]
mod tests {
use futures::future::{lazy, ok, Future, FutureExt};
use rand::Rng;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use super::*;
use crate::http::config::{DispatcherConfig, ServiceConfig};
use crate::http::h1::{ClientCodec, ExpectHandler, UpgradeHandler};
use crate::http::ResponseHead;
use crate::http::{ResponseHead, StatusCode};
use crate::rt::time::delay_for;
use crate::service::IntoService;
use crate::testing::Io;
@ -992,7 +993,7 @@ mod tests {
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
assert!(h1.inner.flags.contains(Flags::SHUTDOWN));
client
.read_buffer(|buf| assert_eq!(&buf[..26], b"HTTP/1.1 400 Bad Request\r\n"));
.local_buffer(|buf| assert_eq!(&buf[..26], b"HTTP/1.1 400 Bad Request\r\n"));
client.close().await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
@ -1088,4 +1089,56 @@ mod tests {
// all request must be handled
assert_eq!(num.load(Ordering::Relaxed), 3);
}
#[ntex_rt::test]
async fn test_read_large_message() {
let (client, server) = Io::create();
client.remote_buffer_cap(4096);
let mut h1 = h1(server, |_| ok::<_, io::Error>(Response::Ok().finish()));
let mut decoder = ClientCodec::default();
let data = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(70_000)
.collect::<String>();
client.write("GET /test HTTP/1.1\r\nContent-Length: ");
client.write(data);
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
assert!(h1.inner.flags.contains(Flags::SHUTDOWN));
let mut buf = client.read().await.unwrap();
assert_eq!(load(&mut decoder, &mut buf).status, StatusCode::BAD_REQUEST);
}
#[ntex_rt::test]
async fn test_read_backpressure() {
let mark = Arc::new(AtomicBool::new(false));
let mark2 = mark.clone();
let (client, server) = Io::create();
client.remote_buffer_cap(4096);
spawn_h1(server, move |_| {
mark2.store(true, Ordering::Relaxed);
async {
delay_for(Duration::from_secs(999999)).await;
Ok::<_, io::Error>(Response::Ok().finish())
}
});
client.write("GET /test HTTP/1.1\r\nContent-Length: 1048576\r\n\r\n");
delay_for(Duration::from_millis(50)).await;
assert!(mark.load(Ordering::Relaxed));
// buf must be consumed
assert_eq!(client.remote_buffer(|buf| buf.len()), 0);
// io should be drained only by no more than MAX_BUFFER_SIZE
let random_bytes: Vec<u8> = (0..1048576).map(|_| rand::random::<u8>()).collect();
client.write(random_bytes);
delay_for(Duration::from_millis(50)).await;
assert!(client.remote_buffer(|buf| buf.len()) > 1048576 - MAX_BUFFER_SIZE * 2);
}
}

View file

@ -20,6 +20,8 @@ pub use self::upgrade::UpgradeHandler;
pub(super) use self::dispatcher::Dispatcher;
const MAX_BUFFER_SIZE: usize = 65_536;
#[derive(Debug)]
/// Codec message
pub enum Message<T> {

View file

@ -123,7 +123,7 @@ impl Io {
}
/// Access read buffer.
pub fn read_buffer<F, R>(&self, f: F) -> R
pub fn local_buffer<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesMut) -> R,
{
@ -132,7 +132,17 @@ impl Io {
f(&mut ch.buf)
}
/// Access write buffer.
/// Access remote buffer.
pub fn remote_buffer<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesMut) -> R,
{
let guard = self.remote.lock().unwrap();
let mut ch = guard.borrow_mut();
f(&mut ch.buf)
}
/// Closed remote side.
pub async fn close(&self) {
{
let guard = self.remote.lock().unwrap();
@ -143,7 +153,7 @@ impl Io {
delay_for(time::Duration::from_millis(35)).await;
}
/// Add extra data to the buffer and notify reader
/// Add extra data to the remote buffer and notify reader
pub fn write<T: AsRef<[u8]>>(&self, data: T) {
let guard = self.remote.lock().unwrap();
let mut write = guard.borrow_mut();