Refactor http1 dispatcher

This commit is contained in:
Nikolay Kim 2022-05-05 14:54:30 +06:00
parent 114a7b6dba
commit a29c003f55
4 changed files with 156 additions and 140 deletions

View file

@ -349,7 +349,7 @@ impl<F> Io<F> {
if self.flags().contains(Flags::KEEPALIVE) {
timer::unregister(self.0 .0.keepalive.get(), &self.0);
}
if timeout != time::Duration::ZERO {
if !timeout.is_zero() {
log::debug!("start keep-alive timeout {:?}", timeout);
self.0 .0.insert_flags(Flags::KEEPALIVE);
self.0 .0.keepalive.set(timer::register(timeout, &self.0));

View file

@ -1,5 +1,9 @@
# Changes
## [0.5.17] - 2022-05-xx
* http: Fix handling for zero slow-request timeout
## [0.5.16] - 2022-04-05
* ws: Add keep-alive timeout support to websockets client

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.5.16"
version = "0.5.17"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"

View file

@ -50,6 +50,8 @@ enum State<B> {
Call,
#[error("State::ReadRequest")]
ReadRequest,
#[error("State::ReadFirstRequest")]
ReadFirstRequest,
#[error("State::ReadPayload")]
ReadPayload,
#[error("State::SendPayload")]
@ -100,16 +102,21 @@ where
io.set_disconnect_timeout(config.client_disconnect.into());
// slow-request timer
let flags = if config.client_timeout.is_zero() {
Flags::empty()
} else {
io.start_keepalive_timer(config.client_timeout);
Flags::KEEPALIVE_REG
};
Dispatcher {
call: CallState::None,
st: State::ReadRequest,
st: State::ReadFirstRequest,
inner: DispatcherInner {
io,
flags,
codec,
config,
flags: Flags::KEEPALIVE_REG,
error: None,
payload: None,
_t: marker::PhantomData,
@ -281,140 +288,9 @@ where
this.call.set(next);
}
}
// read request and call service
State::ReadRequest => {
log::trace!("trying to read http message");
// decode incoming bytes stream
match this.inner.io.poll_recv(&this.inner.codec, cx) {
Poll::Ready(Ok((mut req, pl))) => {
log::trace!(
"http message is received: {:?} and payload {:?}",
req,
pl
);
// configure request payload
let upgrade = match pl {
PayloadType::None => false,
PayloadType::Payload(decoder) => {
let (ps, pl) = Payload::create(false);
req.replace_payload(http::Payload::H1(pl));
this.inner.payload = Some((decoder, ps));
false
}
PayloadType::Stream(decoder) => {
if this.inner.config.upgrade.is_none() {
let (ps, pl) = Payload::create(false);
req.replace_payload(http::Payload::H1(pl));
this.inner.payload = Some((decoder, ps));
false
} else {
this.inner.flags.insert(Flags::UPGRADE);
true
}
}
};
// slow-request first request
this.inner.flags.insert(Flags::STARTED);
this.inner.flags.remove(Flags::KEEPALIVE_REG);
this.inner.io.remove_keepalive_timer();
if upgrade {
// Handle UPGRADE request
log::trace!("prep io for upgrade handler");
*this.st = State::Upgrade(Some(req));
} else {
if req.upgrade() {
this.inner.flags.insert(Flags::UPGRADE_HND);
let io: IoBoxed = this.inner.io.take().into();
req.head_mut().io = CurrentIo::Io(Rc::new((
io.get_ref(),
RefCell::new(Some(Box::new((
io,
this.inner.codec.clone(),
)))),
)));
} else {
req.head_mut().io =
CurrentIo::Ref(this.inner.io.get_ref());
}
*this.st = State::Call;
this.call.set(
if let Some(ref f) = this.inner.config.on_request {
// Handle filter fut
CallState::Filter {
fut: f.call((req, this.inner.io.get_ref())),
}
} else if req.head().expect() {
// Handle normal requests with EXPECT: 100-Continue` header
CallState::Expect {
fut: this.inner.config.expect.call(req),
}
} else if this.inner.flags.contains(Flags::UPGRADE_HND)
{
// Handle upgrade requests
CallState::ServiceUpgrade {
fut: this.inner.config.service.call(req),
}
} else {
// Handle normal requests
CallState::Service {
fut: this.inner.config.service.call(req),
}
},
);
}
}
Poll::Ready(Err(RecvError::WriteBackpressure)) => {
if let Err(err) = ready!(this.inner.io.poll_flush(cx, false)) {
log::trace!("peer is gone with {:?}", err);
*this.st = State::Stop;
this.inner.error = Some(DispatchError::PeerGone(Some(err)));
}
}
Poll::Ready(Err(RecvError::Decoder(err))) => {
// Malformed requests, respond with 400
log::trace!("malformed request: {:?}", err);
let (res, body) = Response::BadRequest().finish().into_parts();
this.inner.error = Some(DispatchError::Parse(err));
*this.st = this.inner.send_response(res, body.into_body());
}
Poll::Ready(Err(RecvError::PeerGone(err))) => {
log::trace!("peer is gone with {:?}", err);
*this.st = State::Stop;
this.inner.error = Some(DispatchError::PeerGone(err));
}
Poll::Ready(Err(RecvError::Stop)) => {
log::trace!("dispatcher is instructed to stop");
*this.st = State::Stop;
}
Poll::Ready(Err(RecvError::KeepAlive)) => {
// keep-alive timeout
if !this.inner.flags.contains(Flags::STARTED) {
log::trace!("slow request timeout");
let (req, body) =
Response::RequestTimeout().finish().into_parts();
let _ = this.inner.send_response(req, body.into_body());
this.inner.error = Some(DispatchError::SlowRequestTimeout);
} else {
log::trace!("keep-alive timeout, close connection");
}
*this.st = State::Stop;
}
Poll::Pending => {
// register keep-alive timer
if this.inner.flags.contains(Flags::KEEPALIVE)
&& !this.inner.flags.contains(Flags::KEEPALIVE_REG)
{
this.inner.flags.insert(Flags::KEEPALIVE_REG);
this.inner
.io
.start_keepalive_timer(this.inner.config.keep_alive);
}
return Poll::Pending;
}
}
*this.st = ready!(this.inner.read_request(cx, &mut this.call));
}
// consume request's payload
State::ReadPayload => {
@ -444,6 +320,11 @@ where
}
}
}
// read first request and call service
State::ReadFirstRequest => {
*this.st = ready!(this.inner.read_request(cx, &mut this.call));
this.inner.flags.insert(Flags::STARTED);
}
// stop io tasks and call upgrade service
State::Upgrade(ref mut req) => {
let io = this.inner.io.take();
@ -485,10 +366,12 @@ where
impl<T, S, B, X, U> DispatcherInner<T, S, B, X, U>
where
T: Filter,
S: Service<Request>,
S::Error: ResponseError + 'static,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request>,
{
fn switch_to_read_request(&mut self) -> State<B> {
// connection is not keep-alive, disconnect
@ -496,14 +379,20 @@ where
self.io.close();
State::Stop
} else {
// register keep-alive timer
if self.flags.contains(Flags::KEEPALIVE) {
self.flags.remove(Flags::KEEPALIVE);
self.flags.insert(Flags::KEEPALIVE_REG);
self.io.start_keepalive_timer(self.config.keep_alive);
}
State::ReadRequest
}
}
fn unregister_keepalive(&mut self) {
if self.flags.contains(Flags::KEEPALIVE) {
if self.flags.contains(Flags::KEEPALIVE_REG) {
self.io.remove_keepalive_timer();
self.flags.remove(Flags::KEEPALIVE);
self.flags.remove(Flags::KEEPALIVE | Flags::KEEPALIVE_REG);
}
}
@ -529,6 +418,129 @@ where
}
}
fn read_request(
&mut self,
cx: &mut Context<'_>,
call_state: &mut std::pin::Pin<&mut CallState<S, X>>,
) -> Poll<State<B>> {
log::trace!("trying to read http message");
loop {
let result = ready!(self.io.poll_recv(&self.codec, cx));
// decode incoming bytes stream
return match result {
Ok((mut req, pl)) => {
log::trace!("http message is received: {:?} and payload {:?}", req, pl);
// keep-alive timer
if self.flags.contains(Flags::KEEPALIVE_REG) {
self.flags.remove(Flags::KEEPALIVE_REG);
self.io.remove_keepalive_timer();
}
// configure request payload
let upgrade = match pl {
PayloadType::None => false,
PayloadType::Payload(decoder) => {
let (ps, pl) = Payload::create(false);
req.replace_payload(http::Payload::H1(pl));
self.payload = Some((decoder, ps));
false
}
PayloadType::Stream(decoder) => {
if self.config.upgrade.is_none() {
let (ps, pl) = Payload::create(false);
req.replace_payload(http::Payload::H1(pl));
self.payload = Some((decoder, ps));
false
} else {
self.flags.insert(Flags::UPGRADE);
true
}
}
};
if upgrade {
// Handle UPGRADE request
log::trace!("prep io for upgrade handler");
Poll::Ready(State::Upgrade(Some(req)))
} else {
if req.upgrade() {
self.flags.insert(Flags::UPGRADE_HND);
let io: IoBoxed = self.io.take().into();
req.head_mut().io = CurrentIo::Io(Rc::new((
io.get_ref(),
RefCell::new(Some(Box::new((io, self.codec.clone())))),
)));
} else {
req.head_mut().io = CurrentIo::Ref(self.io.get_ref());
}
call_state.set(if let Some(ref f) = self.config.on_request {
// Handle filter fut
CallState::Filter {
fut: f.call((req, self.io.get_ref())),
}
} else if req.head().expect() {
// Handle normal requests with EXPECT: 100-Continue` header
CallState::Expect {
fut: self.config.expect.call(req),
}
} else if self.flags.contains(Flags::UPGRADE_HND) {
// Handle upgrade requests
CallState::ServiceUpgrade {
fut: self.config.service.call(req),
}
} else {
// Handle normal requests
CallState::Service {
fut: self.config.service.call(req),
}
});
Poll::Ready(State::Call)
}
}
Err(RecvError::WriteBackpressure) => {
if let Err(err) = ready!(self.io.poll_flush(cx, false)) {
log::trace!("peer is gone with {:?}", err);
self.error = Some(DispatchError::PeerGone(Some(err)));
Poll::Ready(State::Stop)
} else {
continue;
}
}
Err(RecvError::Decoder(err)) => {
// Malformed requests, respond with 400
log::trace!("malformed request: {:?}", err);
let (res, body) = Response::BadRequest().finish().into_parts();
self.error = Some(DispatchError::Parse(err));
Poll::Ready(self.send_response(res, body.into_body()))
}
Err(RecvError::PeerGone(err)) => {
log::trace!("peer is gone with {:?}", err);
self.error = Some(DispatchError::PeerGone(err));
Poll::Ready(State::Stop)
}
Err(RecvError::Stop) => {
log::trace!("dispatcher is instructed to stop");
Poll::Ready(State::Stop)
}
Err(RecvError::KeepAlive) => {
// keep-alive timeout
if !self.flags.contains(Flags::STARTED) {
log::trace!("slow request timeout");
let (req, body) = Response::RequestTimeout().finish().into_parts();
let _ = self.send_response(req, body.into_body());
self.error = Some(DispatchError::SlowRequestTimeout);
} else {
log::trace!("keep-alive timeout, close connection");
}
Poll::Ready(State::Stop)
}
};
}
}
fn send_response(&mut self, msg: Response<()>, body: ResponseBody<B>) -> State<B> {
trace!("sending response: {:?} body: {:?}", msg, body.size());
// we dont need to process responses if socket is disconnected