better naming and comments

This commit is contained in:
Nikolay Kim 2020-07-29 10:12:32 +06:00
parent 4adfc234c9
commit 9ccbf5d74f

View file

@ -90,8 +90,8 @@ where
flags: Flags,
error: Option<DispatchError>,
send_payload: Option<ResponseBody<B>>,
payload: Option<PayloadSender>,
res_payload: Option<ResponseBody<B>>,
req_payload: Option<PayloadSender>,
ka_expire: Instant,
ka_timer: Option<Delay>,
@ -205,8 +205,8 @@ where
upgrade: None,
inner: InnerDispatcher {
write_buf: BytesMut::with_capacity(WRITE_HW_BUFFER_SIZE),
payload: None,
send_payload: None,
req_payload: None,
res_payload: None,
error: None,
io: Some(io),
config,
@ -239,7 +239,7 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
// upgrade
// handle upgrade request
if this.inner.flags.contains(Flags::UPGRADE) {
return this.upgrade.as_pin_mut().unwrap().poll(cx).map_err(|e| {
error!("Upgrade handler error: {}", e);
@ -265,11 +265,9 @@ where
let st = match this.call.project() {
CallStateProject::Service(mut fut) => {
let has_payload = this.inner.payload.is_some();
loop {
// we have to loop because of
// read back-pressure, check Poll::Pending processing
// we have to loop because of read back-pressure,
// check Poll::Pending processing
match fut.poll(cx) {
Poll::Ready(result) => match result {
Ok(res) => {
@ -285,7 +283,7 @@ where
Poll::Pending => {
// if read back-pressure is enabled, we might need
// to read more data (ie serevice future can wait for payload data)
if has_payload && not_completed {
if this.inner.req_payload.is_some() && not_completed {
// read more from io stream
not_completed = !this.inner.poll_read(cx)?;
@ -561,11 +559,12 @@ where
trace!("Sending response: {:?} body: {:?}", msg, body.size());
// we dont need to process responses if socket is disconnected
// but we still want to handle requests with app service
// so we skip response processing for disconnected connection
if !self.flags.contains(Flags::DISCONNECT) {
self.codec
.encode(Message::Item((msg, body.size())), &mut self.write_buf)
.map_err(|err| {
if let Some(mut payload) = self.payload.take() {
if let Some(mut payload) = self.req_payload.take() {
payload.set_error(PayloadError::Incomplete(None));
}
DispatchError::Io(err)
@ -574,9 +573,17 @@ where
self.flags.set(Flags::KEEPALIVE, self.codec.keepalive());
match body.size() {
BodySize::None | BodySize::Empty => Ok(true),
BodySize::None | BodySize::Empty => {
// update keep-alive timer
if self.flags.contains(Flags::HAS_KEEPALIVE) {
if let Some(expire) = self.config.keep_alive_expire() {
self.ka_expire = expire;
}
}
Ok(true)
}
_ => {
self.send_payload = Some(body);
self.res_payload = Some(body);
Ok(false)
}
}
@ -586,7 +593,7 @@ where
}
fn poll_write(&mut self, cx: &mut Context<'_>) -> Result<PollWrite, DispatchError> {
while let Some(ref mut stream) = self.send_payload {
while let Some(ref mut stream) = self.res_payload {
let len = self.write_buf.len();
if len < BUFFER_SIZE {
@ -606,7 +613,14 @@ where
trace!("Response payload eof");
self.codec
.encode(Message::Chunk(None), &mut self.write_buf)?;
self.send_payload = None;
self.res_payload = None;
// update keep-alive timer
if self.flags.contains(Flags::HAS_KEEPALIVE) {
if let Some(expire) = self.config.keep_alive_expire() {
self.ka_expire = expire;
}
}
break;
}
Poll::Ready(Some(Err(e))) => {
@ -643,7 +657,7 @@ where
{
// drain until request payload is consumed and requires more data (backpressure off)
if !self
.payload
.req_payload
.as_ref()
.map(|info| info.need_read(cx) == PayloadStatus::Read)
.unwrap_or(true)
@ -700,7 +714,7 @@ where
fn decode_error(&mut self, e: ParseError) -> DispatcherMessage {
// error during request decoding
if let Some(mut payload) = self.payload.take() {
if let Some(mut payload) = self.req_payload.take() {
payload.set_error(PayloadError::EncodingCorrupted);
}
@ -713,7 +727,7 @@ where
fn decode_payload(&mut self) -> Result<bool, DispatchError> {
if self.flags.contains(Flags::READ_EOF)
|| self.payload.is_none()
|| self.req_payload.is_none()
|| self.read_buf.is_empty()
{
return Ok(false);
@ -722,40 +736,29 @@ where
let mut updated = false;
loop {
match self.codec.decode(&mut self.read_buf) {
Ok(Some(msg)) => {
match msg {
Message::Chunk(chunk) => {
updated = true;
if let Some(ref mut payload) = self.payload {
if let Some(chunk) = chunk {
payload.feed_data(chunk);
} else {
payload.feed_eof();
// update keep-alive timer
if self.flags.contains(Flags::HAS_KEEPALIVE) {
if let Some(expire) =
self.config.keep_alive_expire()
{
self.ka_expire = expire;
}
}
}
Ok(Some(msg)) => match msg {
Message::Chunk(chunk) => {
updated = true;
if let Some(ref mut payload) = self.req_payload {
if let Some(chunk) = chunk {
payload.feed_data(chunk);
} else {
self.internal_error(
"Internal server error: unexpected payload chunk",
);
break;
payload.feed_eof();
}
}
Message::Item(_) => {
} else {
self.internal_error(
"Internal server error: unexpected http message",
"Internal server error: unexpected payload chunk",
);
break;
}
}
}
Message::Item(_) => {
self.internal_error(
"Internal server error: unexpected http message",
);
break;
}
},
Ok(None) => {
self.flags.insert(Flags::READ_EOF);
break;
@ -770,12 +773,12 @@ where
Ok(updated)
}
fn decode(&mut self) -> Result<Option<DispatcherMessage>, DispatchError> {
fn decode_message(&mut self) -> Result<Option<DispatcherMessage>, DispatchError> {
if self.flags.contains(Flags::READ_EOF) || self.read_buf.is_empty() {
return Ok(None);
}
let result = match self.codec.decode(&mut self.read_buf) {
match self.codec.decode(&mut self.read_buf) {
Ok(Some(msg)) => {
self.flags.insert(Flags::STARTED);
@ -792,7 +795,7 @@ where
// handle upgrade request
if pl == MessageType::Stream && self.config.upgrade.is_some() {
self.flags.insert(Flags::STOP_READING);
Some(DispatcherMessage::Upgrade(req))
Ok(Some(DispatcherMessage::Upgrade(req)))
} else {
// handle request with payload
if pl == MessageType::Payload || pl == MessageType::Stream {
@ -800,30 +803,23 @@ where
let (req1, _) =
req.replace_payload(crate::http::Payload::H1(pl));
req = req1;
self.payload = Some(ps);
} else if self.flags.contains(Flags::HAS_KEEPALIVE) {
// update keep-alive timer
if let Some(expire) = self.config.keep_alive_expire() {
self.ka_expire = expire;
}
self.req_payload = Some(ps);
}
Some(DispatcherMessage::Request(req))
Ok(Some(DispatcherMessage::Request(req)))
}
}
Message::Chunk(_) => Some(self.internal_error(
Message::Chunk(_) => Ok(Some(self.internal_error(
"Internal server error: unexpected payload chunk",
)),
))),
}
}
Ok(None) => {
self.flags.insert(Flags::READ_EOF);
None
Ok(None)
}
Err(e) => Some(self.decode_error(e)),
};
Ok(result)
Err(e) => Ok(Some(self.decode_error(e))),
}
}
/// keep-alive timer
@ -890,10 +886,10 @@ where
&mut self,
io: CallProcess<S, X, U>,
) -> Result<CallProcess<S, X, U>, DispatchError> {
while let Some(msg) = self.decode()? {
while let Some(msg) = self.decode_message()? {
return match msg {
DispatcherMessage::Request(req) => {
if self.payload.is_some() {
if self.req_payload.is_some() {
self.decode_payload()?;
}