remove unsafe

This commit is contained in:
Nikolay Kim 2020-04-02 12:05:19 +06:00
parent 7333b22801
commit 4ba4fb6168

View file

@ -156,14 +156,16 @@ where
#[pin_project::pin_project] #[pin_project::pin_project]
struct ServiceResponse<F, I, E, B> { struct ServiceResponse<F, I, E, B> {
#[pin]
state: ServiceResponseState<F, B>, state: ServiceResponseState<F, B>,
config: ServiceConfig, config: ServiceConfig,
buffer: Option<Bytes>, buffer: Option<Bytes>,
_t: PhantomData<(I, E)>, _t: PhantomData<(I, E)>,
} }
#[pin_project::pin_project]
enum ServiceResponseState<F, B> { enum ServiceResponseState<F, B> {
ServiceCall(F, Option<SendResponse<Bytes>>), ServiceCall(#[pin] F, Option<SendResponse<Bytes>>),
SendPayload(SendStream<Bytes>, ResponseBody<B>), SendPayload(SendStream<Bytes>, ResponseBody<B>),
} }
@ -245,19 +247,19 @@ where
{ {
type Output = (); type Output = ();
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
match this.state { #[project]
ServiceResponseState::ServiceCall(ref mut call, ref mut send) => { match this.state.project() {
match unsafe { Pin::new_unchecked(call) }.poll(cx) { ServiceResponseState::ServiceCall(call, send) => match call.poll(cx) {
Poll::Ready(Ok(res)) => { Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(()); let (res, body) = res.into().replace_body(());
let mut send = send.take().unwrap(); let mut send = send.take().unwrap();
let mut size = body.size(); let mut size = body.size();
let h2_res = let h2_res = self.as_mut().prepare_response(res.head(), &mut size);
self.as_mut().prepare_response(res.head(), &mut size);
this = self.as_mut().project(); this = self.as_mut().project();
let stream = match send.send_response(h2_res, size.is_eof()) { let stream = match send.send_response(h2_res, size.is_eof()) {
@ -271,8 +273,8 @@ where
if size.is_eof() { if size.is_eof() {
Poll::Ready(()) Poll::Ready(())
} else { } else {
*this.state = this.state
ServiceResponseState::SendPayload(stream, body); .set(ServiceResponseState::SendPayload(stream, body));
self.poll(cx) self.poll(cx)
} }
} }
@ -283,8 +285,7 @@ where
let mut send = send.take().unwrap(); let mut send = send.take().unwrap();
let mut size = body.size(); let mut size = body.size();
let h2_res = let h2_res = self.as_mut().prepare_response(res.head(), &mut size);
self.as_mut().prepare_response(res.head(), &mut size);
this = self.as_mut().project(); this = self.as_mut().project();
let stream = match send.send_response(h2_res, size.is_eof()) { let stream = match send.send_response(h2_res, size.is_eof()) {
@ -298,18 +299,17 @@ where
if size.is_eof() { if size.is_eof() {
Poll::Ready(()) Poll::Ready(())
} else { } else {
*this.state = ServiceResponseState::SendPayload( this.state.set(ServiceResponseState::SendPayload(
stream, stream,
body.into_body(), body.into_body(),
); ));
self.poll(cx) self.poll(cx)
} }
} }
} },
} ServiceResponseState::SendPayload(stream, body) => loop {
ServiceResponseState::SendPayload(ref mut stream, ref mut body) => loop {
loop { loop {
if let Some(ref mut buffer) = this.buffer { if let Some(buffer) = this.buffer {
match stream.poll_capacity(cx) { match stream.poll_capacity(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(None) => return Poll::Ready(()),