From b0aab5aaa1cd9081b94e4982fd45284f3652b59f Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 29 Jul 2020 03:23:32 +0600 Subject: [PATCH] optimize http/1 dispatcher --- ntex/CHANGES.md | 4 + ntex/Cargo.toml | 2 +- ntex/src/connect/service.rs | 2 +- ntex/src/http/h1/dispatcher.rs | 428 ++++++++++++++++++--------------- 4 files changed, 241 insertions(+), 195 deletions(-) diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 8602a0b1..76dcb4bf 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.21] - 2020-07-29 + +* Optimize http/1 dispatcher + ## [0.1.20] - 2020-07-06 * ntex::util: Add `Buffer` service diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 2cf11c6a..9b7ada43 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.1.20" +version = "0.1.21" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" diff --git a/ntex/src/connect/service.rs b/ntex/src/connect/service.rs index b2ee84a3..82ca0b2a 100644 --- a/ntex/src/connect/service.rs +++ b/ntex/src/connect/service.rs @@ -223,7 +223,7 @@ mod tests { let srv = Connector::default(); let result = srv.connect("").await; assert!(result.is_err()); - let result = srv.connect("localhost-111").await; + let result = srv.connect("localhost:99999").await; assert!(result.is_err()); let srv = Connector::default(); diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 8ebc21b0..08dfaaae 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -13,7 +13,7 @@ use pin_project::pin_project; use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts}; use crate::http::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::http::config::DispatcherConfig; -use crate::http::error::{DispatchError, PayloadError, ResponseError}; +use crate::http::error::{DispatchError, ParseError, PayloadError, ResponseError}; use crate::http::helpers::DataFactory; use crate::http::request::Request; use crate::http::response::Response; @@ -32,7 +32,7 @@ const BUFFER_SIZE: usize = 32_768; const LW_PIPELINED_MESSAGES: usize = 1; bitflags! { - pub struct Flags: u8 { + pub struct Flags: u16 { /// We parsed one complete request message const STARTED = 0b0000_0001; /// Keep-alive is enabled on current connection @@ -49,6 +49,8 @@ bitflags! { const SHUTDOWN_TM = 0b0100_0000; /// Connection is upgraded const UPGRADE = 0b1000_0000; + /// All data has been read + const READ_EOF = 0b0001_0000_0000; } } @@ -115,13 +117,7 @@ enum PollWrite { Pending, /// waiting for response stream (app response) /// or write buffer is full - PendingRespnse, -} - -#[derive(Copy, Clone, PartialEq, Eq)] -enum PollRead { - NoUpdates, - HasUpdates, + PendingResponse, } #[pin_project(project = CallStateProject)] @@ -188,9 +184,9 @@ where ) -> Self { let keepalive = config.keep_alive_enabled(); let flags = if keepalive { - Flags::KEEPALIVE + Flags::KEEPALIVE | Flags::READ_EOF } else { - Flags::empty() + Flags::READ_EOF }; // keep-alive timer @@ -255,47 +251,61 @@ where return this.inner.poll_shutdown(cx); } + // process incoming bytes stream + let mut not_completed = !this.inner.poll_read(cx)?; + this.inner.decode_payload()?; + loop { - // process incoming stream - this.inner.poll_read(cx)?; + // process incoming bytes stream, but only if + // previous iteration didnt read whole buffer + if not_completed { + not_completed = !this.inner.poll_read(cx)?; + } let st = match this.call.project() { - CallStateProject::Service(mut fut) => loop { - // we have to loop because of - // read back-pressure, check Poll::Pending processing - match fut.poll(cx) { - Poll::Ready(result) => match result { - Ok(res) => break this.inner.process_response(res.into())?, - Err(e) => { - let res: Response = e.into(); - break this.inner.process_response( - res.map_body(|_, body| body.into_body()), - )?; - } - }, - Poll::Pending => { - // if read-backpressure is enabled, we might need - // to read more data (ie service future can wait for payload data) - if this.inner.payload.is_some() - && this.inner.poll_read(cx)? == PollRead::HasUpdates - { - // poll_request has read more data, try - // to poll service future again + CallStateProject::Service(mut fut) => { + let has_payload = this.inner.payload.is_some(); - // restore consumed future - this = self.as_mut().project(); - fut = { - match this.call.project() { - CallStateProject::Service(fut) => fut, - _ => panic!(), + loop { + // we have to loop because of + // read back-pressure, check Poll::Pending processing + match fut.poll(cx) { + Poll::Ready(result) => match result { + Ok(res) => { + break this.inner.process_response(res.into())? + } + Err(e) => { + let res: Response = e.into(); + break this.inner.process_response( + res.map_body(|_, body| body.into_body()), + )?; + } + }, + Poll::Pending => { + // if read back-pressure is enabled, we might need + // to read more data (ie serevice future can wait for payload data) + if has_payload && not_completed { + // read more from io stream + not_completed = !this.inner.poll_read(cx)?; + + // more payload chunks has been decoded + if this.inner.decode_payload()? { + // restore consumed future + this = self.as_mut().project(); + fut = { + match this.call.project() { + CallStateProject::Service(fut) => fut, + _ => panic!(), + } + }; + continue; } - }; - continue; + } + break CallProcess::Pending; } - break CallProcess::Pending; } } - }, + } // handle EXPECT call CallStateProject::Expect(fut) => match fut.poll(cx) { Poll::Ready(result) => match result { @@ -340,7 +350,13 @@ where } CallProcess::Io => { // service call queue is empty, we can process next request - match this.inner.poll_write(cx)? { + let write = if !this.inner.flags.contains(Flags::STARTED) { + PollWrite::AllowNext + } else { + this.inner.decode_payload()?; + this.inner.poll_write(cx)? + }; + match write { PollWrite::AllowNext => { match this.inner.process_messages(CallProcess::Io)? { CallProcess::Next(st) => { @@ -357,7 +373,7 @@ where } } PollWrite::Pending => false, - PollWrite::PendingRespnse => { + PollWrite::PendingResponse => { !this.inner.flags.contains(Flags::DISCONNECT) } } @@ -398,8 +414,16 @@ where return if this.inner.flags.contains(Flags::SHUTDOWN) { this.inner.poll_shutdown(cx) } else { + if this.inner.poll_flush(cx)? { + // some data has been written to io stream + this = self.as_mut().project(); + continue; + } + // keep-alive book-keeping - if this.inner.poll_keepalive(cx, processing)? { + if this.inner.ka_timer.is_some() + && this.inner.poll_keepalive(cx, processing)? + { this.inner.poll_shutdown(cx) } else { Poll::Pending @@ -490,17 +514,17 @@ where } /// Flush stream - fn poll_flush(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchError> { + fn poll_flush(&mut self, cx: &mut Context<'_>) -> Result { let len = self.write_buf.len(); if len == 0 { - return Ok(()); + return Ok(false); } let mut written = 0; + let mut io = self.io.as_mut().unwrap(); + while written < len { - match Pin::new(self.io.as_mut().unwrap()) - .poll_write(cx, &self.write_buf[written..]) - { + match Pin::new(&mut io).poll_write(cx, &self.write_buf[written..]) { Poll::Ready(Ok(n)) => { if n == 0 { trace!("Disconnected during flush, written {}", written); @@ -520,12 +544,12 @@ where } } if written == len { - // flushed same amount as buffer, we dont need to reallocate + // flushed whole buffer, we dont need to reallocate unsafe { self.write_buf.set_len(0) } } else { self.write_buf.advance(written); } - Ok(()) + Ok(written != 0) } fn send_response( @@ -561,8 +585,6 @@ where } fn poll_write(&mut self, cx: &mut Context<'_>) -> Result { - let mut flushed = false; - while let Some(ref mut stream) = self.send_payload { let len = self.write_buf.len(); @@ -576,13 +598,11 @@ where match stream.poll_next_chunk(cx) { Poll::Ready(Some(Ok(item))) => { trace!("Got response chunk: {:?}", item.len()); - flushed = false; self.codec .encode(Message::Chunk(Some(item)), &mut self.write_buf)?; } Poll::Ready(None) => { trace!("Response payload eof"); - flushed = false; self.codec .encode(Message::Chunk(None), &mut self.write_buf)?; self.send_payload = None; @@ -593,29 +613,16 @@ where return Err(DispatchError::Unknown); } Poll::Pending => { - // response payload stream is not ready - // we can only flush - if !flushed { - self.poll_flush(cx)?; - } - return Ok(PollWrite::PendingRespnse); + // response payload stream is not ready we can only flush + return Ok(PollWrite::PendingResponse); } } } else { - // write buffer is full, try to flush and check if we have - // space in buffer - flushed = true; - self.poll_flush(cx)?; - if self.write_buf.len() >= BUFFER_SIZE { - return Ok(PollWrite::PendingRespnse); - } + // write buffer is full, we need to flush + return Ok(PollWrite::PendingResponse); } } - if !flushed { - self.poll_flush(cx)?; - } - // we have enought space in write bffer if self.write_buf.len() < BUFFER_SIZE { Ok(PollWrite::AllowNext) @@ -625,7 +632,9 @@ where } /// Process one incoming requests - fn poll_read(&mut self, cx: &mut Context<'_>) -> Result { + fn poll_read(&mut self, cx: &mut Context<'_>) -> Result { + let mut completed = false; + // read socket data into a buf if !self .flags @@ -641,24 +650,26 @@ where .map(|info| info.need_read(cx) == PayloadStatus::Read) .unwrap_or(true) { - return Ok(PollRead::NoUpdates); + return Ok(false); } // read data from socket let io = self.io.as_mut().unwrap(); let buf = &mut self.read_buf; - let mut updated = false; - while buf.len() < MAX_BUFFER_SIZE { - // increase read buffer size - let remaining = buf.capacity() - buf.len(); - if remaining < READ_LW_BUFFER_SIZE { - buf.reserve(BUFFER_SIZE); - } + // increase read buffer size + let remaining = buf.capacity() - buf.len(); + if remaining < READ_LW_BUFFER_SIZE { + buf.reserve(BUFFER_SIZE); + } + + while buf.len() < MAX_BUFFER_SIZE { match Pin::new(&mut *io).poll_read_buf(cx, buf) { - Poll::Pending => break, + Poll::Pending => { + completed = true; + break; + } Poll::Ready(Ok(n)) => { - updated = true; if n == 0 { trace!( "Disconnected during read, buffer size {}", @@ -667,6 +678,7 @@ where self.flags.insert(Flags::DISCONNECT); break; } + self.flags.remove(Flags::READ_EOF); } Poll::Ready(Err(e)) => { trace!("Error during read: {:?}", e); @@ -676,75 +688,50 @@ where } } } - - if !updated { - return Ok(PollRead::NoUpdates); - } } - let result = if self.read_buf.is_empty() { - Ok(PollRead::NoUpdates) - } else { - self.input_decode() - }; - - // socket is disconnected clear read buf - if self.flags.contains(Flags::DISCONNECT) { - self.read_buf.clear(); - // decode operation wont run again, so we have to - // stop payload stream - if let Some(mut payload) = self.payload.take() { - payload.feed_eof(); - } - } - result + Ok(completed) } - fn internal_error(&mut self, msg: &'static str) { + fn internal_error(&mut self, msg: &'static str) -> DispatcherMessage { error!("{}", msg); - self.flags.insert(Flags::DISCONNECT); - self.messages.push_back(DispatcherMessage::Error( - Response::InternalServerError().finish().drop_body(), - )); + self.flags.insert(Flags::DISCONNECT | Flags::READ_EOF); self.error = Some(DispatchError::InternalError); + DispatcherMessage::Error(Response::InternalServerError().finish().drop_body()) } - fn input_decode(&mut self) -> Result { + fn decode_error(&mut self, e: ParseError) -> DispatcherMessage { + // error during request decoding + if let Some(mut payload) = self.payload.take() { + payload.set_error(PayloadError::EncodingCorrupted); + } + + // Malformed requests should be responded with 400 + self.flags.insert(Flags::STOP_READING); + self.read_buf.clear(); + self.error = Some(e.into()); + DispatcherMessage::Error(Response::BadRequest().finish().drop_body()) + } + + fn decode_payload(&mut self) -> Result { + if self.flags.contains(Flags::READ_EOF) + || self.payload.is_none() + || self.read_buf.is_empty() + { + return Ok(false); + } + let mut updated = false; loop { match self.codec.decode(&mut self.read_buf) { Ok(Some(msg)) => { updated = true; - self.flags.insert(Flags::STARTED); - match msg { - Message::Item(mut req) => { - let pl = self.codec.message_type(); - req.head_mut().peer_addr = self.peer_addr; - - // set on_connect data - if let Some(ref on_connect) = self.on_connect { - on_connect.set(&mut req.extensions_mut()); - } - - // handle upgrade request - if pl == MessageType::Stream && self.config.upgrade.is_some() - { - self.flags.insert(Flags::STOP_READING); - self.messages.push_back(DispatcherMessage::Upgrade(req)); - break; - } - - // handle request with payload - if pl == MessageType::Payload || pl == MessageType::Stream { - let (ps, pl) = Payload::create(false); - let (req1, _) = - req.replace_payload(crate::http::Payload::H1(pl)); - req = req1; - self.payload = Some(ps); - } - - self.messages.push_back(DispatcherMessage::Request(req)); + Message::Item(_) => { + self.internal_error( + "Internal server error: unexpected http message", + ); + break; } Message::Chunk(Some(chunk)) => { if let Some(ref mut payload) = self.payload { @@ -768,20 +755,12 @@ where } } } - Ok(None) => break, + Ok(None) => { + self.flags.insert(Flags::READ_EOF); + break; + } Err(e) => { - // error during request decoding - if let Some(mut payload) = self.payload.take() { - payload.set_error(PayloadError::EncodingCorrupted); - } - - // Malformed requests should be responded with 400 - self.messages.push_back(DispatcherMessage::Error( - Response::BadRequest().finish().drop_body(), - )); - self.flags.insert(Flags::STOP_READING); - self.read_buf.clear(); - self.error = Some(e.into()); + self.decode_error(e); break; } } @@ -793,11 +772,71 @@ where } } - if updated { - Ok(PollRead::HasUpdates) - } else { - Ok(PollRead::NoUpdates) + Ok(updated) + } + + fn decode(&mut self) -> Result, DispatchError> { + if self.flags.contains(Flags::READ_EOF) || self.read_buf.is_empty() { + return Ok(None); } + + let mut updated = false; + let result = loop { + match self.codec.decode(&mut self.read_buf) { + Ok(Some(msg)) => { + updated = true; + self.flags.insert(Flags::STARTED); + + match msg { + Message::Item(mut req) => { + let pl = self.codec.message_type(); + req.head_mut().peer_addr = self.peer_addr; + + // set on_connect data + if let Some(ref on_connect) = self.on_connect { + on_connect.set(&mut req.extensions_mut()); + } + + // handle upgrade request + if pl == MessageType::Stream && self.config.upgrade.is_some() + { + self.flags.insert(Flags::STOP_READING); + break Some(DispatcherMessage::Upgrade(req)); + } + + // handle request with payload + if pl == MessageType::Payload || pl == MessageType::Stream { + let (ps, pl) = Payload::create(false); + let (req1, _) = + req.replace_payload(crate::http::Payload::H1(pl)); + req = req1; + self.payload = Some(ps); + } + + break Some(DispatcherMessage::Request(req)); + } + Message::Chunk(_) => { + break Some(self.internal_error( + "Internal server error: unexpected payload chunk", + )) + } + } + } + Ok(None) => { + self.flags.insert(Flags::READ_EOF); + break None; + } + Err(e) => break Some(self.decode_error(e)), + } + }; + + if updated && self.ka_timer.is_some() { + if let Some(expire) = self.config.keep_alive_expire() { + self.ka_expire = expire; + } + } + + Ok(result) } /// keep-alive timer @@ -806,43 +845,42 @@ where cx: &mut Context<'_>, processing: bool, ) -> Result { - if let Some(ref mut ka_timer) = self.ka_timer { - // do nothing for disconnected or upgrade socket or if keep-alive timer is disabled - if self.flags.contains(Flags::DISCONNECT) { - return Ok(false); + let ka_timer = self.ka_timer.as_mut().unwrap(); + // do nothing for disconnected or upgrade socket or if keep-alive timer is disabled + if self.flags.contains(Flags::DISCONNECT) { + return Ok(false); + } + // slow request timeout + else if !self.flags.contains(Flags::STARTED) { + if Pin::new(ka_timer).poll(cx).is_ready() { + // timeout on first request (slow request) return 408 + trace!("Slow request timeout"); + let _ = self.send_response( + Response::RequestTimeout().finish().drop_body(), + ResponseBody::Other(Body::Empty), + ); + self.flags.insert(Flags::STARTED | Flags::SHUTDOWN); + return Ok(true); } - // slow request timeout - else if !self.flags.contains(Flags::STARTED) { - if Pin::new(ka_timer).poll(cx).is_ready() { - // timeout on first request (slow request) return 408 - trace!("Slow request timeout"); - let _ = self.send_response( - Response::RequestTimeout().finish().drop_body(), - ResponseBody::Other(Body::Empty), - ); - self.flags.insert(Flags::STARTED | Flags::SHUTDOWN); - return Ok(true); - } - } - // normal keep-alive, but only if we are not processing any requests - else if !processing { - // keep-alive timer - if Pin::new(&mut *ka_timer).poll(cx).is_ready() { - if ka_timer.deadline() >= self.ka_expire { - // check for any outstanding tasks - if self.write_buf.is_empty() { - trace!("Keep-alive timeout, close connection"); - self.flags.insert(Flags::SHUTDOWN); - return Ok(true); - } else if let Some(dl) = self.config.keep_alive_expire() { - // extend keep-alive timer - ka_timer.reset(dl); - } - } else { - ka_timer.reset(self.ka_expire); + } + // normal keep-alive, but only if we are not processing any requests + else if !processing { + // keep-alive timer + if Pin::new(&mut *ka_timer).poll(cx).is_ready() { + if ka_timer.deadline() >= self.ka_expire { + // check for any outstanding tasks + if self.write_buf.is_empty() { + trace!("Keep-alive timeout, close connection"); + self.flags.insert(Flags::SHUTDOWN); + return Ok(true); + } else if let Some(dl) = self.config.keep_alive_expire() { + // extend keep-alive timer + ka_timer.reset(dl); } - let _ = Pin::new(ka_timer).poll(cx); + } else { + ka_timer.reset(self.ka_expire); } + let _ = Pin::new(ka_timer).poll(cx); } } Ok(false) @@ -865,9 +903,13 @@ where &mut self, io: CallProcess, ) -> Result, DispatchError> { - while let Some(msg) = self.messages.pop_front() { + while let Some(msg) = self.decode()? { return match msg { DispatcherMessage::Request(req) => { + if self.payload.is_some() { + self.decode_payload()?; + } + // Handle `EXPECT: 100-Continue` header Ok(CallProcess::Next(if req.head().expect() { CallState::Expect(self.config.expect.call(req))