This commit is contained in:
Nikolay Kim 2020-12-22 19:55:06 +06:00
parent 3cdfdadeba
commit 1e2bc4a9e2
15 changed files with 66 additions and 94 deletions

View file

@ -50,11 +50,11 @@ pub(super) fn requote(val: &[u8]) -> Option<String> {
#[inline] #[inline]
fn from_hex(v: u8) -> Option<u8> { fn from_hex(v: u8) -> Option<u8> {
if v >= b'0' && v <= b'9' { if (b'0'..=b'9').contains(&v) {
Some(v - 0x30) // ord('0') == 0x30 Some(v - 0x30) // ord('0') == 0x30
} else if v >= b'A' && v <= b'F' { } else if (b'A'..=b'F').contains(&v) {
Some(v - 0x41 + 10) // ord('A') == 0x41 Some(v - 0x41 + 10) // ord('A') == 0x41
} else if v >= b'a' && v <= b'f' { } else if (b'a'..=b'f').contains(&v) {
Some(v - 0x61 + 10) // ord('a') == 0x61 Some(v - 0x61 + 10) // ord('a') == 0x61
} else { } else {
None None

View file

@ -32,17 +32,16 @@ enum PathElement {
impl PathElement { impl PathElement {
fn is_str(&self) -> bool { fn is_str(&self) -> bool {
match self { matches!(self, PathElement::Str(_))
PathElement::Str(_) => true,
_ => false,
}
} }
fn into_str(self) -> String { fn into_str(self) -> String {
match self { match self {
PathElement::Str(s) => s, PathElement::Str(s) => s,
_ => panic!(), _ => panic!(),
} }
} }
fn as_str(&self) -> &str { fn as_str(&self) -> &str {
match self { match self {
PathElement::Str(s) => s.as_str(), PathElement::Str(s) => s.as_str(),
@ -380,10 +379,8 @@ impl ResourceDef {
} }
if !pattern.is_empty() { if !pattern.is_empty() {
// handle tail expression for static segment // handle tail expression for static segment
if pattern.ends_with('*') { if let Some(stripped) = pattern.strip_suffix('*') {
let pattern = let pattern = Regex::new(&format!("^{}(.+)", stripped)).unwrap();
Regex::new(&format!("^{}(.+)", &pattern[..pattern.len() - 1]))
.unwrap();
pelems.push(Segment::Dynamic { pelems.push(Segment::Dynamic {
pattern, pattern,
names: Vec::new(), names: Vec::new(),

View file

@ -244,8 +244,8 @@ impl Tree {
} }
} }
let path = if path.starts_with('/') { let path = if let Some(path) = path.strip_prefix('/') {
&path[1..] path
} else { } else {
base_skip -= 1; base_skip -= 1;
path path
@ -282,8 +282,8 @@ impl Tree {
return None; return None;
} }
let path = if path.starts_with('/') { let path = if let Some(path) = path.strip_prefix('/') {
&path[1..] path
} else { } else {
base_skip -= 1; base_skip -= 1;
path path
@ -356,11 +356,8 @@ impl Tree {
path.len() path.len()
}; };
let segment = T::unquote(&path[..idx]); let segment = T::unquote(&path[..idx]);
let quoted = if let Cow::Owned(_) = segment { let quoted = matches!(segment, Cow::Owned(_));
true
} else {
false
};
// check segment match // check segment match
let is_match = match key[0] { let is_match = match key[0] {
Segment::Static(ref pattern) => { Segment::Static(ref pattern) => {

View file

@ -114,10 +114,7 @@ impl Arbiter {
.unbounded_send(SystemCommand::RegisterArbiter(id, arb)); .unbounded_send(SystemCommand::RegisterArbiter(id, arb));
// run loop // run loop
let _ = match rt.block_on(stop_rx) { let _ = rt.block_on(stop_rx);
Ok(code) => code,
Err(_) => 1,
};
// unregister arbiter // unregister arbiter
let _ = System::current() let _ = System::current()

View file

@ -130,7 +130,7 @@ impl AsyncSystemRunner {
// run loop // run loop
lazy(|_| async { lazy(|_| async {
let res = match stop.await { match stop.await {
Ok(code) => { Ok(code) => {
if code != 0 { if code != 0 {
Err(io::Error::new( Err(io::Error::new(
@ -142,8 +142,7 @@ impl AsyncSystemRunner {
} }
} }
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}; }
return res;
}) })
.flatten() .flatten()
} }

View file

@ -18,10 +18,7 @@ pub enum BodySize {
impl BodySize { impl BodySize {
pub fn is_eof(&self) -> bool { pub fn is_eof(&self) -> bool {
match self { matches!(self, BodySize::None | BodySize::Empty | BodySize::Sized(0))
BodySize::None | BodySize::Empty | BodySize::Sized(0) => true,
_ => false,
}
} }
} }
@ -191,14 +188,8 @@ impl MessageBody for Body {
impl PartialEq for Body { impl PartialEq for Body {
fn eq(&self, other: &Body) -> bool { fn eq(&self, other: &Body) -> bool {
match *self { match *self {
Body::None => match *other { Body::None => matches!(*other, Body::None),
Body::None => true, Body::Empty => matches!(*other, Body::Empty),
_ => false,
},
Body::Empty => match *other {
Body::Empty => true,
_ => false,
},
Body::Bytes(ref b) => match *other { Body::Bytes(ref b) => match *other {
Body::Bytes(ref b2) => b == b2, Body::Bytes(ref b2) => b == b2,
_ => false, _ => false,

View file

@ -31,10 +31,10 @@ where
trace!("Sending client request: {:?} {:?}", head, body.size()); trace!("Sending client request: {:?} {:?}", head, body.size());
let head_req = head.as_ref().method == Method::HEAD; let head_req = head.as_ref().method == Method::HEAD;
let length = body.size(); let length = body.size();
let eof = match length { let eof = matches!(
BodySize::None | BodySize::Empty | BodySize::Sized(0) => true, length,
_ => false, BodySize::None | BodySize::Empty | BodySize::Sized(0)
}; );
let mut req = Request::new(()); let mut req = Request::new(());
*req.uri_mut() = head.as_ref().uri.clone(); *req.uri_mut() = head.as_ref().uri.clone();

View file

@ -140,11 +140,11 @@ where
// use existing connection // use existing connection
Acquire::Acquired(io, created) => { Acquire::Acquired(io, created) => {
trace!("Use existing connection for {:?}", req.uri); trace!("Use existing connection for {:?}", req.uri);
return Ok(IoConnection::new( Ok(IoConnection::new(
io, io,
created, created,
Some(Acquired(key, Some(inner))), Some(Acquired(key, Some(inner))),
)); ))
} }
// open new tcp connection // open new tcp connection
Acquire::Available => { Acquire::Available => {

View file

@ -154,7 +154,7 @@ impl RequestHeadType {
SendClientRequest::new( SendClientRequest::new(
config.connector.send_request(self, body.into(), addr), config.connector.send_request(self, body.into(), addr),
response_decompress, response_decompress,
timeout.or_else(|| config.timeout), timeout.or(config.timeout),
) )
} }

View file

@ -11,7 +11,7 @@ use futures::Stream;
use crate::codec::{AsyncRead, AsyncWrite, Framed}; use crate::codec::{AsyncRead, AsyncWrite, Framed};
use crate::http::error::HttpError; use crate::http::error::HttpError;
use crate::http::header::{self, HeaderName, HeaderValue, AUTHORIZATION}; use crate::http::header::{self, HeaderName, HeaderValue, AUTHORIZATION};
use crate::http::{ConnectionType, Method, StatusCode, Uri, Version}; use crate::http::{ConnectionType, StatusCode, Uri};
use crate::http::{Payload, RequestHead}; use crate::http::{Payload, RequestHead};
use crate::rt::time::timeout; use crate::rt::time::timeout;
use crate::service::{IntoService, Service}; use crate::service::{IntoService, Service};
@ -48,8 +48,6 @@ impl WebsocketsRequest {
{ {
let mut err = None; let mut err = None;
let mut head = RequestHead::default(); let mut head = RequestHead::default();
head.method = Method::GET;
head.version = Version::HTTP_11;
match Uri::try_from(uri) { match Uri::try_from(uri) {
Ok(uri) => head.uri = uri, Ok(uri) => head.uri = uri,

View file

@ -254,14 +254,14 @@ where
} }
// process incoming bytes stream // process incoming bytes stream
let mut not_completed = !this.inner.poll_read(cx)?; let mut not_completed = !this.inner.poll_read(cx);
this.inner.decode_payload()?; this.inner.decode_payload();
loop { loop {
// process incoming bytes stream, but only if // process incoming bytes stream, but only if
// previous iteration didnt read whole buffer // previous iteration didnt read whole buffer
if not_completed { if not_completed {
not_completed = !this.inner.poll_read(cx)?; not_completed = !this.inner.poll_read(cx);
} }
let st = match this.call.project() { let st = match this.call.project() {
@ -286,10 +286,10 @@ where
// to read more data (ie serevice future can wait for payload data) // to read more data (ie serevice future can wait for payload data)
if this.inner.req_payload.is_some() && not_completed { if this.inner.req_payload.is_some() && not_completed {
// read more from io stream // read more from io stream
not_completed = !this.inner.poll_read(cx)?; not_completed = !this.inner.poll_read(cx);
// more payload chunks has been decoded // more payload chunks has been decoded
if this.inner.decode_payload()? { if this.inner.decode_payload() {
// restore consumed future // restore consumed future
this = self.as_mut().project(); this = self.as_mut().project();
fut = { fut = {
@ -353,7 +353,7 @@ where
let write = if !this.inner.flags.contains(Flags::STARTED) { let write = if !this.inner.flags.contains(Flags::STARTED) {
PollWrite::AllowNext PollWrite::AllowNext
} else { } else {
this.inner.decode_payload()?; this.inner.decode_payload();
this.inner.poll_write(cx)? this.inner.poll_write(cx)?
}; };
match write { match write {
@ -421,9 +421,7 @@ where
} }
// keep-alive book-keeping // keep-alive book-keeping
if this.inner.ka_timer.is_some() if this.inner.ka_timer.is_some() && this.inner.poll_keepalive(cx, idle) {
&& this.inner.poll_keepalive(cx, idle)?
{
this.inner.poll_shutdown(cx) this.inner.poll_shutdown(cx)
} else { } else {
Poll::Pending Poll::Pending
@ -648,7 +646,7 @@ where
} }
/// Read data from io stream /// Read data from io stream
fn poll_read(&mut self, cx: &mut Context<'_>) -> Result<bool, DispatchError> { fn poll_read(&mut self, cx: &mut Context<'_>) -> bool {
let mut completed = false; let mut completed = false;
// read socket data into a buf // read socket data into a buf
@ -663,7 +661,7 @@ where
.map(|info| info.need_read(cx) == PayloadStatus::Read) .map(|info| info.need_read(cx) == PayloadStatus::Read)
.unwrap_or(true) .unwrap_or(true)
{ {
return Ok(false); return false;
} }
// read data from socket // read data from socket
@ -703,7 +701,7 @@ where
} }
} }
Ok(completed) completed
} }
fn internal_error(&mut self, msg: &'static str) -> DispatcherMessage { fn internal_error(&mut self, msg: &'static str) -> DispatcherMessage {
@ -726,12 +724,12 @@ where
DispatcherMessage::Error(Response::BadRequest().finish().drop_body()) DispatcherMessage::Error(Response::BadRequest().finish().drop_body())
} }
fn decode_payload(&mut self) -> Result<bool, DispatchError> { fn decode_payload(&mut self) -> bool {
if self.flags.contains(Flags::READ_EOF) if self.flags.contains(Flags::READ_EOF)
|| self.req_payload.is_none() || self.req_payload.is_none()
|| self.read_buf.is_empty() || self.read_buf.is_empty()
{ {
return Ok(false); return false;
} }
let mut updated = false; let mut updated = false;
@ -772,12 +770,12 @@ where
} }
} }
Ok(updated) updated
} }
fn decode_message(&mut self) -> Result<Option<DispatcherMessage>, DispatchError> { fn decode_message(&mut self) -> Option<DispatcherMessage> {
if self.flags.contains(Flags::READ_EOF) || self.read_buf.is_empty() { if self.flags.contains(Flags::READ_EOF) || self.read_buf.is_empty() {
return Ok(None); return None;
} }
match self.codec.decode(&mut self.read_buf) { match self.codec.decode(&mut self.read_buf) {
@ -797,7 +795,7 @@ where
// handle upgrade request // handle upgrade request
if pl == MessageType::Stream && self.config.upgrade.is_some() { if pl == MessageType::Stream && self.config.upgrade.is_some() {
self.flags.insert(Flags::STOP_READING); self.flags.insert(Flags::STOP_READING);
Ok(Some(DispatcherMessage::Upgrade(req))) Some(DispatcherMessage::Upgrade(req))
} else { } else {
// handle request with payload // handle request with payload
if pl == MessageType::Payload || pl == MessageType::Stream { if pl == MessageType::Payload || pl == MessageType::Stream {
@ -808,32 +806,28 @@ where
self.req_payload = Some(ps); self.req_payload = Some(ps);
} }
Ok(Some(DispatcherMessage::Request(req))) Some(DispatcherMessage::Request(req))
} }
} }
Message::Chunk(_) => Ok(Some(self.internal_error( Message::Chunk(_) => Some(self.internal_error(
"Internal server error: unexpected payload chunk", "Internal server error: unexpected payload chunk",
))), )),
} }
} }
Ok(None) => { Ok(None) => {
self.flags.insert(Flags::READ_EOF); self.flags.insert(Flags::READ_EOF);
Ok(None) None
} }
Err(e) => Ok(Some(self.decode_error(e))), Err(e) => Some(self.decode_error(e)),
} }
} }
/// keep-alive timer /// keep-alive timer
fn poll_keepalive( fn poll_keepalive(&mut self, cx: &mut Context<'_>, idle: bool) -> bool {
&mut self,
cx: &mut Context<'_>,
idle: bool,
) -> Result<bool, DispatchError> {
let ka_timer = self.ka_timer.as_mut().unwrap(); let ka_timer = self.ka_timer.as_mut().unwrap();
// do nothing for disconnected or upgrade socket or if keep-alive timer is disabled // do nothing for disconnected or upgrade socket or if keep-alive timer is disabled
if self.flags.contains(Flags::DISCONNECT) { if self.flags.contains(Flags::DISCONNECT) {
return Ok(false); return false;
} }
// slow request timeout // slow request timeout
else if !self.flags.contains(Flags::STARTED) { else if !self.flags.contains(Flags::STARTED) {
@ -845,7 +839,7 @@ where
ResponseBody::Other(Body::Empty), ResponseBody::Other(Body::Empty),
); );
self.flags.insert(Flags::STARTED | Flags::SHUTDOWN); self.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
return Ok(true); return true;
} }
} }
// normal keep-alive, but only if we are not processing any requests // normal keep-alive, but only if we are not processing any requests
@ -857,7 +851,7 @@ where
if self.write_buf.is_empty() { if self.write_buf.is_empty() {
trace!("Keep-alive timeout, close connection"); trace!("Keep-alive timeout, close connection");
self.flags.insert(Flags::SHUTDOWN); self.flags.insert(Flags::SHUTDOWN);
return Ok(true); return true;
} else if let Some(dl) = self.config.keep_alive_expire() { } else if let Some(dl) = self.config.keep_alive_expire() {
// extend keep-alive timer // extend keep-alive timer
ka_timer.reset(dl); ka_timer.reset(dl);
@ -868,7 +862,7 @@ where
let _ = Pin::new(ka_timer).poll(cx); let _ = Pin::new(ka_timer).poll(cx);
} }
} }
Ok(false) false
} }
fn process_response( fn process_response(
@ -888,11 +882,11 @@ where
&mut self, &mut self,
io: CallProcess<S, X, U>, io: CallProcess<S, X, U>,
) -> Result<CallProcess<S, X, U>, DispatchError> { ) -> Result<CallProcess<S, X, U>, DispatchError> {
while let Some(msg) = self.decode_message()? { while let Some(msg) = self.decode_message() {
return match msg { return match msg {
DispatcherMessage::Request(req) => { DispatcherMessage::Request(req) => {
if self.req_payload.is_some() { if self.req_payload.is_some() {
self.decode_payload()?; self.decode_payload();
} }
// Handle `EXPECT: 100-Continue` header // Handle `EXPECT: 100-Continue` header

View file

@ -27,10 +27,7 @@ impl ContentEncoding {
#[inline] #[inline]
/// Is the content compressed? /// Is the content compressed?
pub fn is_compressed(self) -> bool { pub fn is_compressed(self) -> bool {
match self { !matches!(self, ContentEncoding::Identity | ContentEncoding::Auto)
ContentEncoding::Identity | ContentEncoding::Auto => false,
_ => true,
}
} }
#[inline] #[inline]

View file

@ -741,6 +741,7 @@ impl fmt::Debug for ResponseBuilder {
} }
} }
#[allow(clippy::unnecessary_wraps)]
fn log_error<T: Into<HttpError>>(err: T) -> Option<HttpError> { fn log_error<T: Into<HttpError>>(err: T) -> Option<HttpError> {
let e = err.into(); let e = err.into();
error!("Error in ResponseBuilder {}", e); error!("Error in ResponseBuilder {}", e);

View file

@ -150,7 +150,7 @@ impl InternalServiceFactory for ConfiguredService {
)); ));
}; };
} }
return Ok(res); Ok(res)
} }
.boxed_local() .boxed_local()
} }
@ -271,13 +271,13 @@ where
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
let fut = self.inner.new_service(()); let fut = self.inner.new_service(());
async move { async move {
return match fut.await { match fut.await {
Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService),
Err(e) => { Err(e) => {
error!("Can not construct service: {:?}", e); error!("Can not construct service: {:?}", e);
Err(()) Err(())
} }
}; }
} }
.boxed_local() .boxed_local()
} }

View file

@ -232,7 +232,7 @@ impl<Err: ErrorRenderer> FromRequest<Err> for String {
Ok(encoding Ok(encoding
.decode_without_bom_handling_and_without_replacement(&body) .decode_without_bom_handling_and_without_replacement(&body)
.map(|s| s.into_owned()) .map(|s| s.into_owned())
.ok_or_else(|| PayloadError::Decoding)?) .ok_or(PayloadError::Decoding)?)
} }
} }
.boxed_local(), .boxed_local(),
@ -249,9 +249,10 @@ pub struct PayloadConfig {
impl PayloadConfig { impl PayloadConfig {
/// Create `PayloadConfig` instance and set max size of payload. /// Create `PayloadConfig` instance and set max size of payload.
pub fn new(limit: usize) -> Self { pub fn new(limit: usize) -> Self {
let mut cfg = Self::default(); PayloadConfig {
cfg.limit = limit; limit,
cfg ..Default::default()
}
} }
/// Change max size of payload. By default max size is 256Kb /// Change max size of payload. By default max size is 256Kb