refactor http1 keep-alive

This commit is contained in:
Nikolay Kim 2020-05-04 08:37:14 +06:00
parent 34184ba83f
commit 69366b47e1

View file

@ -130,15 +130,6 @@ enum CallState<S: Service, X: Service> {
Service(#[pin] S::Future),
}
impl<S: Service, X: Service> CallState<S, X> {
fn is_io(&self) -> bool {
match self {
CallState::Io => true,
_ => false,
}
}
}
enum CallProcess<S: Service, X: Service, U: Service> {
/// next call is available
Next(CallState<S, X>),
@ -259,12 +250,6 @@ where
});
}
// keep-alive book-keeping
this.inner.poll_keepalive(
cx,
this.call.is_io() && this.inner.send_payload.is_none(),
)?;
// shutdown process
if this.inner.flags.contains(Flags::SHUTDOWN) {
return this.inner.poll_shutdown(cx);
@ -414,7 +399,12 @@ where
return if this.inner.flags.contains(Flags::SHUTDOWN) {
this.inner.poll_shutdown(cx)
} else {
Poll::Pending
// keep-alive book-keeping
if this.inner.poll_keepalive(cx, processing)? {
this.inner.poll_shutdown(cx)
} else {
Poll::Pending
}
};
}
}
@ -815,49 +805,48 @@ where
fn poll_keepalive(
&mut self,
cx: &mut Context<'_>,
is_empty: bool,
) -> Result<(), DispatchError> {
// do nothing for disconnected or upgrade socket or if keep-alive timer is disabled
if self.flags.contains(Flags::DISCONNECT) || self.ka_timer.is_none() {
return Ok(());
}
if !self.flags.contains(Flags::STARTED) {
processing: bool,
) -> Result<bool, DispatchError> {
if let Some(ref mut ka_timer) = self.ka_timer {
// do nothing for disconnected or upgrade socket or if keep-alive timer is disabled
if self.flags.contains(Flags::DISCONNECT) {
return Ok(false);
}
// slow request timeout
if Pin::new(&mut self.ka_timer.as_mut().unwrap())
.poll(cx)
.is_ready()
{
// timeout on first request (slow request) return 408
trace!("Slow request timeout");
let _ = self.send_response(
Response::RequestTimeout().finish().drop_body(),
ResponseBody::Other(Body::Empty),
);
self.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
}
} else {
let mut timer = self.ka_timer.as_mut().unwrap();
// keep-alive timer
if Pin::new(&mut timer).poll(cx).is_ready() {
if timer.deadline() >= self.ka_expire {
// check for any outstanding tasks
if is_empty && self.write_buf.is_empty() {
trace!("Keep-alive timeout, close connection");
self.flags.insert(Flags::SHUTDOWN);
return Ok(());
} else if let Some(dl) = self.config.keep_alive_expire() {
// extend keep-alive timer
timer.reset(dl);
}
} else {
timer.reset(self.ka_expire);
else if !self.flags.contains(Flags::STARTED) {
if Pin::new(ka_timer).poll(cx).is_ready() {
// timeout on first request (slow request) return 408
trace!("Slow request timeout");
let _ = self.send_response(
Response::RequestTimeout().finish().drop_body(),
ResponseBody::Other(Body::Empty),
);
self.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
return Ok(true);
}
}
// normal keep-alive, but only if we are not processing any requests
else if !processing {
// keep-alive timer
if Pin::new(&mut *ka_timer).poll(cx).is_ready() {
if ka_timer.deadline() >= self.ka_expire {
// check for any outstanding tasks
if self.write_buf.is_empty() {
trace!("Keep-alive timeout, close connection");
self.flags.insert(Flags::SHUTDOWN);
return Ok(true);
} else if let Some(dl) = self.config.keep_alive_expire() {
// extend keep-alive timer
ka_timer.reset(dl);
}
} else {
ka_timer.reset(self.ka_expire);
}
let _ = Pin::new(ka_timer).poll(cx);
}
let _ = Pin::new(&mut timer).poll(cx);
}
}
Ok(())
Ok(false)
}
fn process_response(