cleanup h1 dispatcher

This commit is contained in:
Nikolay Kim 2021-12-22 15:48:59 +06:00
parent 73c5a5faac
commit 95b7290e36
2 changed files with 64 additions and 82 deletions

View file

@ -356,7 +356,7 @@ impl<F: Filter> AsyncRead for Io<F> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let len = self.with_read_buf(|src| {
let len = cmp::min(src.len(), buf.capacity());
let len = cmp::min(src.len(), buf.remaining());
buf.put_slice(&src.split_to(len));
len
});
@ -401,7 +401,7 @@ impl AsyncRead for IoBoxed {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let len = self.with_read_buf(|src| {
let len = cmp::min(src.len(), buf.capacity());
let len = cmp::min(src.len(), buf.remaining());
buf.put_slice(&src.split_to(len));
len
});

View file

@ -165,7 +165,7 @@ where
// (ie service future can wait for payload data)
if this.inner.payload.is_some() {
if let Err(e) =
ready!(this.inner.poll_read_payload(cx))
ready!(this.inner.poll_request_payload(cx))
{
set_error!(this, e);
}
@ -177,73 +177,62 @@ where
None
}
// handle EXPECT call
CallStateProject::Expect { fut } => match fut.poll(cx) {
Poll::Ready(result) => match result {
// expect service call must resolve before
// we can do any more io processing.
//
// TODO: check keep-alive timer interaction
CallStateProject::Expect { fut } => match ready!(fut.poll(cx)) {
Ok(req) => {
let result = this.inner.state.with_write_buf(|buf| {
buf.extend_from_slice(
b"HTTP/1.1 100 Continue\r\n\r\n",
)
});
if result.is_err() {
*this.st = State::Stop;
this.inner.unregister_keepalive();
this = self.as_mut().project();
continue;
} else if this.inner.flags.contains(Flags::UPGRADE) {
*this.st = State::Upgrade(Some(req));
this = self.as_mut().project();
continue;
} else {
Some(CallState::Service {
fut: this.inner.config.service.call(req),
})
}
}
Err(e) => {
*this.st = this.inner.handle_error(e, true);
None
}
},
// handle FILTER call
CallStateProject::Filter { fut } => {
match ready!(Pin::new(fut).poll(cx)) {
Ok(req) => {
let result =
this.inner.state.with_write_buf(|buf| {
buf.extend_from_slice(
b"HTTP/1.1 100 Continue\r\n\r\n",
)
});
if result.is_err() {
*this.st = State::Stop;
this.inner.unregister_keepalive();
this = self.as_mut().project();
continue;
} else if this.inner.flags.contains(Flags::UPGRADE) {
*this.st = State::Upgrade(Some(req));
this = self.as_mut().project();
continue;
this.inner
.codec
.set_ctype(req.head().connection_type());
if req.head().expect() {
// Handle normal requests with EXPECT: 100-Continue` header
Some(CallState::Expect {
fut: this.inner.config.expect.call(req),
})
} else {
// Handle normal requests
Some(CallState::Service {
fut: this.inner.config.service.call(req),
})
}
}
Err(e) => {
*this.st = this.inner.handle_error(e, true);
Err(res) => {
let (res, body) = res.into_parts();
*this.st =
this.inner.send_response(res, body.into_body());
None
}
},
Poll::Pending => {
// expect service call must resolve before
// we can do any more io processing.
//
// TODO: check keep-alive timer interaction
return Poll::Pending;
}
},
// handle FILTER call
CallStateProject::Filter { fut } => {
if let Poll::Ready(result) = Pin::new(fut).poll(cx) {
match result {
Ok(req) => {
this.inner
.codec
.set_ctype(req.head().connection_type());
if req.head().expect() {
// Handle normal requests with EXPECT: 100-Continue` header
Some(CallState::Expect {
fut: this.inner.config.expect.call(req),
})
} else {
// Handle normal requests
Some(CallState::Service {
fut: this.inner.config.service.call(req),
})
}
}
Err(res) => {
let (res, body) = res.into_parts();
*this.st = this
.inner
.send_response(res, body.into_body());
None
}
}
} else {
return Poll::Pending;
}
}
CallStateProject::None => unreachable!(),
@ -284,8 +273,8 @@ where
let io = this.inner.io();
// decode incoming bytes stream
match io.poll_recv(&this.inner.codec, cx) {
Poll::Ready(Ok(Some((mut req, pl)))) => {
match ready!(io.poll_recv(&this.inner.codec, cx)) {
Ok(Some((mut req, pl))) => {
log::trace!(
"http message is received: {:?} and payload {:?}",
req,
@ -350,13 +339,13 @@ where
);
}
}
Poll::Ready(Ok(None)) => {
Ok(None) => {
// peer is gone
log::trace!("peer is gone");
let e = DispatchError::Disconnect(None);
set_error!(this, e);
}
Poll::Ready(Err(Either::Left(err))) => {
Err(Either::Left(err)) => {
// Malformed requests, respond with 400
log::trace!("malformed request: {:?}", err);
let (res, body) =
@ -364,33 +353,29 @@ where
this.inner.error = Some(DispatchError::Parse(err));
*this.st = this.inner.send_response(res, body.into_body());
}
Poll::Ready(Err(Either::Right(err))) => {
Err(Either::Right(err)) => {
log::trace!("peer is gone with {:?}", err);
// peer is gone
let e = DispatchError::Disconnect(Some(err));
set_error!(this, e);
}
Poll::Pending => {
log::trace!("not enough data to decode http message");
return Poll::Pending;
}
}
}
// consume request's payload
State::ReadPayload => match ready!(this.inner.poll_read_payload(cx)) {
Ok(()) => {
State::ReadPayload => {
if let Err(e) = ready!(this.inner.poll_request_payload(cx)) {
set_error!(this, e);
} else {
*this.st = this.inner.switch_to_read_request();
}
Err(e) => {
set_error!(this, e);
}
},
}
// send response body
State::SendPayload { ref mut body } => {
if !this.inner.state.is_io_open() {
let e = this.inner.state.take_error().into();
set_error!(this, e);
} else if let Poll::Ready(Err(e)) = this.inner.poll_read_payload(cx)
} else if let Poll::Ready(Err(e)) =
this.inner.poll_request_payload(cx)
{
set_error!(this, e);
} else {
@ -410,7 +395,6 @@ where
let io = this.inner.io.take().unwrap();
let req = req.take().unwrap();
*this.st = State::Call;
// Handle UPGRADE request
crate::rt::spawn(
@ -597,7 +581,7 @@ where
}
/// Process request's payload
fn poll_read_payload(
fn poll_request_payload(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), DispatchError>> {
@ -617,11 +601,9 @@ where
payload.1.feed_data(chunk);
}
Poll::Ready(Ok(Some(PayloadItem::Eof))) => {
updated = true;
payload.1.feed_eof();
self.payload = None;
if !updated {
return Poll::Ready(Ok(()));
}
break;
}
Poll::Ready(Ok(None)) => {