optimize http/1 dispatcher

This commit is contained in:
Nikolay Kim 2020-07-29 03:23:32 +06:00
parent deeccb9f60
commit b0aab5aaa1
4 changed files with 241 additions and 195 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.21] - 2020-07-29
* Optimize http/1 dispatcher
## [0.1.20] - 2020-07-06
* ntex::util: Add `Buffer` service

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.1.20"
version = "0.1.21"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"

View file

@ -223,7 +223,7 @@ mod tests {
let srv = Connector::default();
let result = srv.connect("").await;
assert!(result.is_err());
let result = srv.connect("localhost-111").await;
let result = srv.connect("localhost:99999").await;
assert!(result.is_err());
let srv = Connector::default();

View file

@ -13,7 +13,7 @@ use pin_project::pin_project;
use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts};
use crate::http::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::http::config::DispatcherConfig;
use crate::http::error::{DispatchError, PayloadError, ResponseError};
use crate::http::error::{DispatchError, ParseError, PayloadError, ResponseError};
use crate::http::helpers::DataFactory;
use crate::http::request::Request;
use crate::http::response::Response;
@ -32,7 +32,7 @@ const BUFFER_SIZE: usize = 32_768;
const LW_PIPELINED_MESSAGES: usize = 1;
bitflags! {
pub struct Flags: u8 {
pub struct Flags: u16 {
/// We parsed one complete request message
const STARTED = 0b0000_0001;
/// Keep-alive is enabled on current connection
@ -49,6 +49,8 @@ bitflags! {
const SHUTDOWN_TM = 0b0100_0000;
/// Connection is upgraded
const UPGRADE = 0b1000_0000;
/// All data has been read
const READ_EOF = 0b0001_0000_0000;
}
}
@ -115,13 +117,7 @@ enum PollWrite {
Pending,
/// waiting for response stream (app response)
/// or write buffer is full
PendingRespnse,
}
#[derive(Copy, Clone, PartialEq, Eq)]
enum PollRead {
NoUpdates,
HasUpdates,
PendingResponse,
}
#[pin_project(project = CallStateProject)]
@ -188,9 +184,9 @@ where
) -> Self {
let keepalive = config.keep_alive_enabled();
let flags = if keepalive {
Flags::KEEPALIVE
Flags::KEEPALIVE | Flags::READ_EOF
} else {
Flags::empty()
Flags::READ_EOF
};
// keep-alive timer
@ -255,47 +251,61 @@ where
return this.inner.poll_shutdown(cx);
}
// process incoming bytes stream
let mut not_completed = !this.inner.poll_read(cx)?;
this.inner.decode_payload()?;
loop {
// process incoming stream
this.inner.poll_read(cx)?;
// process incoming bytes stream, but only if
// previous iteration didnt read whole buffer
if not_completed {
not_completed = !this.inner.poll_read(cx)?;
}
let st = match this.call.project() {
CallStateProject::Service(mut fut) => loop {
// we have to loop because of
// read back-pressure, check Poll::Pending processing
match fut.poll(cx) {
Poll::Ready(result) => match result {
Ok(res) => break this.inner.process_response(res.into())?,
Err(e) => {
let res: Response = e.into();
break this.inner.process_response(
res.map_body(|_, body| body.into_body()),
)?;
}
},
Poll::Pending => {
// if read-backpressure is enabled, we might need
// to read more data (ie service future can wait for payload data)
if this.inner.payload.is_some()
&& this.inner.poll_read(cx)? == PollRead::HasUpdates
{
// poll_request has read more data, try
// to poll service future again
CallStateProject::Service(mut fut) => {
let has_payload = this.inner.payload.is_some();
// restore consumed future
this = self.as_mut().project();
fut = {
match this.call.project() {
CallStateProject::Service(fut) => fut,
_ => panic!(),
loop {
// we have to loop because of
// read back-pressure, check Poll::Pending processing
match fut.poll(cx) {
Poll::Ready(result) => match result {
Ok(res) => {
break this.inner.process_response(res.into())?
}
Err(e) => {
let res: Response = e.into();
break this.inner.process_response(
res.map_body(|_, body| body.into_body()),
)?;
}
},
Poll::Pending => {
// if read back-pressure is enabled, we might need
// to read more data (ie serevice future can wait for payload data)
if has_payload && not_completed {
// read more from io stream
not_completed = !this.inner.poll_read(cx)?;
// more payload chunks has been decoded
if this.inner.decode_payload()? {
// restore consumed future
this = self.as_mut().project();
fut = {
match this.call.project() {
CallStateProject::Service(fut) => fut,
_ => panic!(),
}
};
continue;
}
};
continue;
}
break CallProcess::Pending;
}
break CallProcess::Pending;
}
}
},
}
// handle EXPECT call
CallStateProject::Expect(fut) => match fut.poll(cx) {
Poll::Ready(result) => match result {
@ -340,7 +350,13 @@ where
}
CallProcess::Io => {
// service call queue is empty, we can process next request
match this.inner.poll_write(cx)? {
let write = if !this.inner.flags.contains(Flags::STARTED) {
PollWrite::AllowNext
} else {
this.inner.decode_payload()?;
this.inner.poll_write(cx)?
};
match write {
PollWrite::AllowNext => {
match this.inner.process_messages(CallProcess::Io)? {
CallProcess::Next(st) => {
@ -357,7 +373,7 @@ where
}
}
PollWrite::Pending => false,
PollWrite::PendingRespnse => {
PollWrite::PendingResponse => {
!this.inner.flags.contains(Flags::DISCONNECT)
}
}
@ -398,8 +414,16 @@ where
return if this.inner.flags.contains(Flags::SHUTDOWN) {
this.inner.poll_shutdown(cx)
} else {
if this.inner.poll_flush(cx)? {
// some data has been written to io stream
this = self.as_mut().project();
continue;
}
// keep-alive book-keeping
if this.inner.poll_keepalive(cx, processing)? {
if this.inner.ka_timer.is_some()
&& this.inner.poll_keepalive(cx, processing)?
{
this.inner.poll_shutdown(cx)
} else {
Poll::Pending
@ -490,17 +514,17 @@ where
}
/// Flush stream
fn poll_flush(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchError> {
fn poll_flush(&mut self, cx: &mut Context<'_>) -> Result<bool, DispatchError> {
let len = self.write_buf.len();
if len == 0 {
return Ok(());
return Ok(false);
}
let mut written = 0;
let mut io = self.io.as_mut().unwrap();
while written < len {
match Pin::new(self.io.as_mut().unwrap())
.poll_write(cx, &self.write_buf[written..])
{
match Pin::new(&mut io).poll_write(cx, &self.write_buf[written..]) {
Poll::Ready(Ok(n)) => {
if n == 0 {
trace!("Disconnected during flush, written {}", written);
@ -520,12 +544,12 @@ where
}
}
if written == len {
// flushed same amount as buffer, we dont need to reallocate
// flushed whole buffer, we dont need to reallocate
unsafe { self.write_buf.set_len(0) }
} else {
self.write_buf.advance(written);
}
Ok(())
Ok(written != 0)
}
fn send_response(
@ -561,8 +585,6 @@ where
}
fn poll_write(&mut self, cx: &mut Context<'_>) -> Result<PollWrite, DispatchError> {
let mut flushed = false;
while let Some(ref mut stream) = self.send_payload {
let len = self.write_buf.len();
@ -576,13 +598,11 @@ where
match stream.poll_next_chunk(cx) {
Poll::Ready(Some(Ok(item))) => {
trace!("Got response chunk: {:?}", item.len());
flushed = false;
self.codec
.encode(Message::Chunk(Some(item)), &mut self.write_buf)?;
}
Poll::Ready(None) => {
trace!("Response payload eof");
flushed = false;
self.codec
.encode(Message::Chunk(None), &mut self.write_buf)?;
self.send_payload = None;
@ -593,29 +613,16 @@ where
return Err(DispatchError::Unknown);
}
Poll::Pending => {
// response payload stream is not ready
// we can only flush
if !flushed {
self.poll_flush(cx)?;
}
return Ok(PollWrite::PendingRespnse);
// response payload stream is not ready we can only flush
return Ok(PollWrite::PendingResponse);
}
}
} else {
// write buffer is full, try to flush and check if we have
// space in buffer
flushed = true;
self.poll_flush(cx)?;
if self.write_buf.len() >= BUFFER_SIZE {
return Ok(PollWrite::PendingRespnse);
}
// write buffer is full, we need to flush
return Ok(PollWrite::PendingResponse);
}
}
if !flushed {
self.poll_flush(cx)?;
}
// we have enought space in write bffer
if self.write_buf.len() < BUFFER_SIZE {
Ok(PollWrite::AllowNext)
@ -625,7 +632,9 @@ where
}
/// Process one incoming requests
fn poll_read(&mut self, cx: &mut Context<'_>) -> Result<PollRead, DispatchError> {
fn poll_read(&mut self, cx: &mut Context<'_>) -> Result<bool, DispatchError> {
let mut completed = false;
// read socket data into a buf
if !self
.flags
@ -641,24 +650,26 @@ where
.map(|info| info.need_read(cx) == PayloadStatus::Read)
.unwrap_or(true)
{
return Ok(PollRead::NoUpdates);
return Ok(false);
}
// read data from socket
let io = self.io.as_mut().unwrap();
let buf = &mut self.read_buf;
let mut updated = false;
while buf.len() < MAX_BUFFER_SIZE {
// increase read buffer size
let remaining = buf.capacity() - buf.len();
if remaining < READ_LW_BUFFER_SIZE {
buf.reserve(BUFFER_SIZE);
}
// increase read buffer size
let remaining = buf.capacity() - buf.len();
if remaining < READ_LW_BUFFER_SIZE {
buf.reserve(BUFFER_SIZE);
}
while buf.len() < MAX_BUFFER_SIZE {
match Pin::new(&mut *io).poll_read_buf(cx, buf) {
Poll::Pending => break,
Poll::Pending => {
completed = true;
break;
}
Poll::Ready(Ok(n)) => {
updated = true;
if n == 0 {
trace!(
"Disconnected during read, buffer size {}",
@ -667,6 +678,7 @@ where
self.flags.insert(Flags::DISCONNECT);
break;
}
self.flags.remove(Flags::READ_EOF);
}
Poll::Ready(Err(e)) => {
trace!("Error during read: {:?}", e);
@ -676,75 +688,50 @@ where
}
}
}
if !updated {
return Ok(PollRead::NoUpdates);
}
}
let result = if self.read_buf.is_empty() {
Ok(PollRead::NoUpdates)
} else {
self.input_decode()
};
// socket is disconnected clear read buf
if self.flags.contains(Flags::DISCONNECT) {
self.read_buf.clear();
// decode operation wont run again, so we have to
// stop payload stream
if let Some(mut payload) = self.payload.take() {
payload.feed_eof();
}
}
result
Ok(completed)
}
fn internal_error(&mut self, msg: &'static str) {
fn internal_error(&mut self, msg: &'static str) -> DispatcherMessage {
error!("{}", msg);
self.flags.insert(Flags::DISCONNECT);
self.messages.push_back(DispatcherMessage::Error(
Response::InternalServerError().finish().drop_body(),
));
self.flags.insert(Flags::DISCONNECT | Flags::READ_EOF);
self.error = Some(DispatchError::InternalError);
DispatcherMessage::Error(Response::InternalServerError().finish().drop_body())
}
fn input_decode(&mut self) -> Result<PollRead, DispatchError> {
fn decode_error(&mut self, e: ParseError) -> DispatcherMessage {
// error during request decoding
if let Some(mut payload) = self.payload.take() {
payload.set_error(PayloadError::EncodingCorrupted);
}
// Malformed requests should be responded with 400
self.flags.insert(Flags::STOP_READING);
self.read_buf.clear();
self.error = Some(e.into());
DispatcherMessage::Error(Response::BadRequest().finish().drop_body())
}
fn decode_payload(&mut self) -> Result<bool, DispatchError> {
if self.flags.contains(Flags::READ_EOF)
|| self.payload.is_none()
|| self.read_buf.is_empty()
{
return Ok(false);
}
let mut updated = false;
loop {
match self.codec.decode(&mut self.read_buf) {
Ok(Some(msg)) => {
updated = true;
self.flags.insert(Flags::STARTED);
match msg {
Message::Item(mut req) => {
let pl = self.codec.message_type();
req.head_mut().peer_addr = self.peer_addr;
// set on_connect data
if let Some(ref on_connect) = self.on_connect {
on_connect.set(&mut req.extensions_mut());
}
// handle upgrade request
if pl == MessageType::Stream && self.config.upgrade.is_some()
{
self.flags.insert(Flags::STOP_READING);
self.messages.push_back(DispatcherMessage::Upgrade(req));
break;
}
// handle request with payload
if pl == MessageType::Payload || pl == MessageType::Stream {
let (ps, pl) = Payload::create(false);
let (req1, _) =
req.replace_payload(crate::http::Payload::H1(pl));
req = req1;
self.payload = Some(ps);
}
self.messages.push_back(DispatcherMessage::Request(req));
Message::Item(_) => {
self.internal_error(
"Internal server error: unexpected http message",
);
break;
}
Message::Chunk(Some(chunk)) => {
if let Some(ref mut payload) = self.payload {
@ -768,20 +755,12 @@ where
}
}
}
Ok(None) => break,
Ok(None) => {
self.flags.insert(Flags::READ_EOF);
break;
}
Err(e) => {
// error during request decoding
if let Some(mut payload) = self.payload.take() {
payload.set_error(PayloadError::EncodingCorrupted);
}
// Malformed requests should be responded with 400
self.messages.push_back(DispatcherMessage::Error(
Response::BadRequest().finish().drop_body(),
));
self.flags.insert(Flags::STOP_READING);
self.read_buf.clear();
self.error = Some(e.into());
self.decode_error(e);
break;
}
}
@ -793,11 +772,71 @@ where
}
}
if updated {
Ok(PollRead::HasUpdates)
} else {
Ok(PollRead::NoUpdates)
Ok(updated)
}
fn decode(&mut self) -> Result<Option<DispatcherMessage>, DispatchError> {
if self.flags.contains(Flags::READ_EOF) || self.read_buf.is_empty() {
return Ok(None);
}
let mut updated = false;
let result = loop {
match self.codec.decode(&mut self.read_buf) {
Ok(Some(msg)) => {
updated = true;
self.flags.insert(Flags::STARTED);
match msg {
Message::Item(mut req) => {
let pl = self.codec.message_type();
req.head_mut().peer_addr = self.peer_addr;
// set on_connect data
if let Some(ref on_connect) = self.on_connect {
on_connect.set(&mut req.extensions_mut());
}
// handle upgrade request
if pl == MessageType::Stream && self.config.upgrade.is_some()
{
self.flags.insert(Flags::STOP_READING);
break Some(DispatcherMessage::Upgrade(req));
}
// handle request with payload
if pl == MessageType::Payload || pl == MessageType::Stream {
let (ps, pl) = Payload::create(false);
let (req1, _) =
req.replace_payload(crate::http::Payload::H1(pl));
req = req1;
self.payload = Some(ps);
}
break Some(DispatcherMessage::Request(req));
}
Message::Chunk(_) => {
break Some(self.internal_error(
"Internal server error: unexpected payload chunk",
))
}
}
}
Ok(None) => {
self.flags.insert(Flags::READ_EOF);
break None;
}
Err(e) => break Some(self.decode_error(e)),
}
};
if updated && self.ka_timer.is_some() {
if let Some(expire) = self.config.keep_alive_expire() {
self.ka_expire = expire;
}
}
Ok(result)
}
/// keep-alive timer
@ -806,43 +845,42 @@ where
cx: &mut Context<'_>,
processing: bool,
) -> Result<bool, DispatchError> {
if let Some(ref mut ka_timer) = self.ka_timer {
// do nothing for disconnected or upgrade socket or if keep-alive timer is disabled
if self.flags.contains(Flags::DISCONNECT) {
return Ok(false);
let ka_timer = self.ka_timer.as_mut().unwrap();
// do nothing for disconnected or upgrade socket or if keep-alive timer is disabled
if self.flags.contains(Flags::DISCONNECT) {
return Ok(false);
}
// slow request timeout
else if !self.flags.contains(Flags::STARTED) {
if Pin::new(ka_timer).poll(cx).is_ready() {
// timeout on first request (slow request) return 408
trace!("Slow request timeout");
let _ = self.send_response(
Response::RequestTimeout().finish().drop_body(),
ResponseBody::Other(Body::Empty),
);
self.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
return Ok(true);
}
// slow request timeout
else if !self.flags.contains(Flags::STARTED) {
if Pin::new(ka_timer).poll(cx).is_ready() {
// timeout on first request (slow request) return 408
trace!("Slow request timeout");
let _ = self.send_response(
Response::RequestTimeout().finish().drop_body(),
ResponseBody::Other(Body::Empty),
);
self.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
return Ok(true);
}
}
// normal keep-alive, but only if we are not processing any requests
else if !processing {
// keep-alive timer
if Pin::new(&mut *ka_timer).poll(cx).is_ready() {
if ka_timer.deadline() >= self.ka_expire {
// check for any outstanding tasks
if self.write_buf.is_empty() {
trace!("Keep-alive timeout, close connection");
self.flags.insert(Flags::SHUTDOWN);
return Ok(true);
} else if let Some(dl) = self.config.keep_alive_expire() {
// extend keep-alive timer
ka_timer.reset(dl);
}
} else {
ka_timer.reset(self.ka_expire);
}
// normal keep-alive, but only if we are not processing any requests
else if !processing {
// keep-alive timer
if Pin::new(&mut *ka_timer).poll(cx).is_ready() {
if ka_timer.deadline() >= self.ka_expire {
// check for any outstanding tasks
if self.write_buf.is_empty() {
trace!("Keep-alive timeout, close connection");
self.flags.insert(Flags::SHUTDOWN);
return Ok(true);
} else if let Some(dl) = self.config.keep_alive_expire() {
// extend keep-alive timer
ka_timer.reset(dl);
}
let _ = Pin::new(ka_timer).poll(cx);
} else {
ka_timer.reset(self.ka_expire);
}
let _ = Pin::new(ka_timer).poll(cx);
}
}
Ok(false)
@ -865,9 +903,13 @@ where
&mut self,
io: CallProcess<S, X, U>,
) -> Result<CallProcess<S, X, U>, DispatchError> {
while let Some(msg) = self.messages.pop_front() {
while let Some(msg) = self.decode()? {
return match msg {
DispatcherMessage::Request(req) => {
if self.payload.is_some() {
self.decode_payload()?;
}
// Handle `EXPECT: 100-Continue` header
Ok(CallProcess::Next(if req.head().expect() {
CallState::Expect(self.config.expect.call(req))