Proper handle filter/extect calls (#186)

This commit is contained in:
Nikolay Kim 2023-03-11 12:38:17 +11:00 committed by GitHub
parent c04ce4c6c7
commit 1a2bdf33eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 85 additions and 109 deletions

View file

@ -3,6 +3,9 @@
## [0.6.3] - unreleased
* http: Add `ClientResponse::headers_mut()` method
* http: Don't stop h1 dispatcher on upgrade handler with await #178
* web: `AppConfig` can be created with custom parameters via `new()`
## [0.6.2] - 2023-01-24

View file

@ -7,7 +7,7 @@ use crate::{service::Service, util::ready, util::BoxFuture, util::Bytes};
use crate::http;
use crate::http::body::{BodySize, MessageBody, ResponseBody};
use crate::http::config::DispatcherConfig;
use crate::http::config::{DispatcherConfig, OnRequest};
use crate::http::error::{DispatchError, ParseError, PayloadError, ResponseError};
use crate::http::message::CurrentIo;
use crate::http::request::Request;
@ -184,51 +184,36 @@ where
// so we have to send response and disconnect. request payload
// handling should be handled by service
CallStateProject::ServiceUpgrade { fut } => {
if let Poll::Ready(result) = fut.poll(cx) {
match result {
Ok(res) => {
let (msg, body) = res.into().into_parts();
let item = if let Some(item) = msg.head().take_io()
{
item
} else {
log::trace!(
"Handler service consumed io, exit"
);
return Poll::Ready(Ok(()));
};
let _ = item.0.encode(
Message::Item((msg, body.size())),
&item.1,
);
match body.size() {
BodySize::None | BodySize::Empty => {}
_ => {
log::error!("Stream responses are not supported for upgrade requests");
}
}
*this.st = State::StopIo(item);
}
Err(e) => {
log::error!(
"Cannot handle error for upgrade handler: {:?}",
e
);
match ready!(fut.poll(cx)) {
Ok(res) => {
let (msg, body) = res.into().into_parts();
let item = if let Some(item) = msg.head().take_io() {
item
} else {
log::trace!("Handler service consumed io, exit");
return Poll::Ready(Ok(()));
};
let _ = item
.0
.encode(Message::Item((msg, body.size())), &item.1);
match body.size() {
BodySize::None | BodySize::Empty => {}
_ => {
log::error!("Stream responses are not supported for upgrade requests");
}
}
*this.st = State::StopIo(item);
}
Err(e) => {
log::error!(
"Cannot handle error for upgrade handler: {:?}",
e
);
return Poll::Ready(Ok(()));
}
None
} else if this.inner.flags.contains(Flags::UPGRADE_HND) {
// continue on upgrade handler processing
return Poll::Pending;
} else if this.inner.poll_io_closed(cx) {
// check if io is closed
*this.st = State::Stop;
None
} else {
return Poll::Pending;
}
None
}
// handle EXPECT call
// expect service call must resolve before
@ -253,20 +238,9 @@ where
this = self.as_mut().project();
continue;
} else if this.inner.flags.contains(Flags::UPGRADE_HND) {
// Handle upgrade requests
let fut = this.inner.config.service.call(req);
let st = Some(CallState::ServiceUpgrade {
fut: unsafe { mem::transmute_copy(&fut) },
});
mem::forget(fut);
st
Some(this.inner.service_upgrade(req))
} else {
let fut = this.inner.config.service.call(req);
let st = Some(CallState::Service {
fut: unsafe { mem::transmute_copy(&fut) },
});
mem::forget(fut);
st
Some(this.inner.service_call(req))
}
}
Err(e) => {
@ -282,30 +256,12 @@ where
.codec
.set_ctype(req.head().connection_type());
if req.head().expect() {
// Handle normal requests with EXPECT: 100-Continue` header
let fut = this.inner.config.expect.call(req);
let st = Some(CallState::Expect {
fut: unsafe { mem::transmute_copy(&fut) },
});
mem::forget(fut);
st
Some(this.inner.service_expect(req))
} else if this.inner.flags.contains(Flags::UPGRADE_HND)
{
// Handle upgrade requests
let fut = this.inner.config.service.call(req);
let st = Some(CallState::ServiceUpgrade {
fut: unsafe { mem::transmute_copy(&fut) },
});
mem::forget(fut);
st
Some(this.inner.service_upgrade(req))
} else {
// Handle normal requests
let fut = this.inner.config.service.call(req);
let st = Some(CallState::Service {
fut: unsafe { mem::transmute_copy(&fut) },
});
mem::forget(fut);
st
Some(this.inner.service_call(req))
}
}
Err(res) => {
@ -459,6 +415,52 @@ where
}
}
fn service_call(&self, req: Request) -> CallState<S, X> {
// Handle normal requests
let fut = self.config.service.call(req);
let st = CallState::Service {
fut: unsafe { mem::transmute_copy(&fut) },
};
mem::forget(fut);
st
}
fn service_filter(&self, req: Request, f: &OnRequest) -> CallState<S, X> {
// Handle filter fut
let fut = f.call((req, self.io.get_ref()));
let st = CallState::Filter {
fut: unsafe { mem::transmute_copy(&fut) },
};
mem::forget(fut);
st
}
fn service_expect(&self, req: Request) -> CallState<S, X> {
// Handle normal requests with EXPECT: 100-Continue` header
let fut = self.config.expect.call(req);
let st = CallState::Expect {
fut: unsafe { mem::transmute_copy(&fut) },
};
mem::forget(fut);
st
}
fn service_upgrade(&mut self, mut req: Request) -> CallState<S, X> {
// Move io into request
let io: IoBoxed = self.io.take().into();
req.head_mut().io = CurrentIo::Io(Rc::new((
io.get_ref(),
RefCell::new(Some(Box::new((io, self.codec.clone())))),
)));
// Handle upgrade requests
let fut = self.config.service.call(req);
let st = CallState::ServiceUpgrade {
fut: unsafe { mem::transmute_copy(&fut) },
};
mem::forget(fut);
st
}
fn read_request(
&mut self,
cx: &mut Context<'_>,
@ -509,46 +511,17 @@ where
} else {
if req.upgrade() {
self.flags.insert(Flags::UPGRADE_HND);
let io: IoBoxed = self.io.take().into();
req.head_mut().io = CurrentIo::Io(Rc::new((
io.get_ref(),
RefCell::new(Some(Box::new((io, self.codec.clone())))),
)));
} else {
req.head_mut().io = CurrentIo::Ref(self.io.get_ref());
}
call_state.set(if let Some(ref f) = self.config.on_request {
// Handle filter fut
let fut = f.call((req, self.io.get_ref()));
let st = CallState::Filter {
fut: unsafe { mem::transmute_copy(&fut) },
};
mem::forget(fut);
st
self.service_filter(req, f)
} else if req.head().expect() {
// Handle normal requests with EXPECT: 100-Continue` header
let fut = self.config.expect.call(req);
let st = CallState::Expect {
fut: unsafe { mem::transmute_copy(&fut) },
};
mem::forget(fut);
st
self.service_expect(req)
} else if self.flags.contains(Flags::UPGRADE_HND) {
// Handle upgrade requests
let fut = self.config.service.call(req);
let st = CallState::ServiceUpgrade {
fut: unsafe { mem::transmute_copy(&fut) },
};
mem::forget(fut);
st
self.service_upgrade(req)
} else {
// Handle normal requests
let fut = self.config.service.call(req);
let st = CallState::Service {
fut: unsafe { mem::transmute_copy(&fut) },
};
mem::forget(fut);
st
self.service_call(req)
});
Poll::Ready(State::Call)
}