mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 13:27:39 +03:00
update pin_project dep
This commit is contained in:
parent
b83e005aa2
commit
17d30019c9
4 changed files with 67 additions and 72 deletions
|
@ -8,7 +8,7 @@ use std::{fmt, io, mem, net};
|
|||
use bitflags::bitflags;
|
||||
use bytes::{Buf, BytesMut};
|
||||
use futures::ready;
|
||||
use pin_project::{pin_project, project};
|
||||
use pin_project::pin_project;
|
||||
|
||||
use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts};
|
||||
use crate::http::body::{Body, BodySize, MessageBody, ResponseBody};
|
||||
|
@ -124,7 +124,7 @@ enum PollRead {
|
|||
HasUpdates,
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
#[pin_project(project = CallStateProject)]
|
||||
enum CallState<S: Service, X: Service> {
|
||||
Io,
|
||||
Expect(#[pin] X::Future),
|
||||
|
@ -239,7 +239,6 @@ where
|
|||
{
|
||||
type Output = Result<(), DispatchError>;
|
||||
|
||||
#[project]
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.as_mut().project();
|
||||
|
||||
|
@ -260,9 +259,8 @@ where
|
|||
// process incoming stream
|
||||
this.inner.poll_read(cx)?;
|
||||
|
||||
#[project]
|
||||
let st = match this.call.project() {
|
||||
CallState::Service(mut fut) => loop {
|
||||
CallStateProject::Service(mut fut) => loop {
|
||||
// we have to loop because of
|
||||
// read back-pressure, check Poll::Pending processing
|
||||
match fut.poll(cx) {
|
||||
|
@ -287,9 +285,8 @@ where
|
|||
// restore consumed future
|
||||
this = self.as_mut().project();
|
||||
fut = {
|
||||
#[project]
|
||||
match this.call.project() {
|
||||
CallState::Service(fut) => fut,
|
||||
CallStateProject::Service(fut) => fut,
|
||||
_ => panic!(),
|
||||
}
|
||||
};
|
||||
|
@ -300,7 +297,7 @@ where
|
|||
}
|
||||
},
|
||||
// handle EXPECT call
|
||||
CallState::Expect(fut) => match fut.poll(cx) {
|
||||
CallStateProject::Expect(fut) => match fut.poll(cx) {
|
||||
Poll::Ready(result) => match result {
|
||||
Ok(req) => {
|
||||
this.inner
|
||||
|
@ -325,7 +322,7 @@ where
|
|||
return Poll::Pending;
|
||||
}
|
||||
},
|
||||
CallState::Io => CallProcess::Io,
|
||||
CallStateProject::Io => CallProcess::Io,
|
||||
};
|
||||
|
||||
let processing = match st {
|
||||
|
|
|
@ -149,7 +149,7 @@ pin_project_lite::pin_project! {
|
|||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
#[pin_project::pin_project(project = ServiceResponseStateProject)]
|
||||
enum ServiceResponseState<F, B> {
|
||||
ServiceCall(#[pin] F, Option<SendResponse<Bytes>>),
|
||||
SendPayload(SendStream<Bytes>, ResponseBody<B>),
|
||||
|
@ -230,67 +230,69 @@ where
|
|||
{
|
||||
type Output = ();
|
||||
|
||||
#[pin_project::project]
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.as_mut().project();
|
||||
|
||||
#[project]
|
||||
match this.state.project() {
|
||||
ServiceResponseState::ServiceCall(call, send) => match call.poll(cx) {
|
||||
Poll::Ready(Ok(res)) => {
|
||||
let (res, body) = res.into().replace_body(());
|
||||
ServiceResponseStateProject::ServiceCall(call, send) => {
|
||||
match call.poll(cx) {
|
||||
Poll::Ready(Ok(res)) => {
|
||||
let (res, body) = res.into().replace_body(());
|
||||
|
||||
let mut send = send.take().unwrap();
|
||||
let mut size = body.size();
|
||||
let h2_res = self.as_mut().prepare_response(res.head(), &mut size);
|
||||
this = self.as_mut().project();
|
||||
let mut send = send.take().unwrap();
|
||||
let mut size = body.size();
|
||||
let h2_res =
|
||||
self.as_mut().prepare_response(res.head(), &mut size);
|
||||
this = self.as_mut().project();
|
||||
|
||||
let stream = match send.send_response(h2_res, size.is_eof()) {
|
||||
Err(e) => {
|
||||
trace!("Error sending h2 response: {:?}", e);
|
||||
return Poll::Ready(());
|
||||
let stream = match send.send_response(h2_res, size.is_eof()) {
|
||||
Err(e) => {
|
||||
trace!("Error sending h2 response: {:?}", e);
|
||||
return Poll::Ready(());
|
||||
}
|
||||
Ok(stream) => stream,
|
||||
};
|
||||
|
||||
if size.is_eof() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
this.state
|
||||
.set(ServiceResponseState::SendPayload(stream, body));
|
||||
self.poll(cx)
|
||||
}
|
||||
Ok(stream) => stream,
|
||||
};
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(e)) => {
|
||||
let res: Response = e.into();
|
||||
let (res, body) = res.replace_body(());
|
||||
|
||||
if size.is_eof() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
this.state
|
||||
.set(ServiceResponseState::SendPayload(stream, body));
|
||||
self.poll(cx)
|
||||
let mut send = send.take().unwrap();
|
||||
let mut size = body.size();
|
||||
let h2_res =
|
||||
self.as_mut().prepare_response(res.head(), &mut size);
|
||||
this = self.as_mut().project();
|
||||
|
||||
let stream = match send.send_response(h2_res, size.is_eof()) {
|
||||
Err(e) => {
|
||||
trace!("Error sending h2 response: {:?}", e);
|
||||
return Poll::Ready(());
|
||||
}
|
||||
Ok(stream) => stream,
|
||||
};
|
||||
|
||||
if size.is_eof() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
this.state.set(ServiceResponseState::SendPayload(
|
||||
stream,
|
||||
body.into_body(),
|
||||
));
|
||||
self.poll(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(e)) => {
|
||||
let res: Response = e.into();
|
||||
let (res, body) = res.replace_body(());
|
||||
|
||||
let mut send = send.take().unwrap();
|
||||
let mut size = body.size();
|
||||
let h2_res = self.as_mut().prepare_response(res.head(), &mut size);
|
||||
this = self.as_mut().project();
|
||||
|
||||
let stream = match send.send_response(h2_res, size.is_eof()) {
|
||||
Err(e) => {
|
||||
trace!("Error sending h2 response: {:?}", e);
|
||||
return Poll::Ready(());
|
||||
}
|
||||
Ok(stream) => stream,
|
||||
};
|
||||
|
||||
if size.is_eof() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
this.state.set(ServiceResponseState::SendPayload(
|
||||
stream,
|
||||
body.into_body(),
|
||||
));
|
||||
self.poll(cx)
|
||||
}
|
||||
}
|
||||
},
|
||||
ServiceResponseState::SendPayload(stream, body) => loop {
|
||||
}
|
||||
ServiceResponseStateProject::SendPayload(stream, body) => loop {
|
||||
loop {
|
||||
if let Some(buffer) = this.buffer {
|
||||
match stream.poll_capacity(cx) {
|
||||
|
|
|
@ -579,7 +579,7 @@ where
|
|||
state: State<T, S, B, X, U>,
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
#[pin_project(project = StateProject)]
|
||||
enum State<T, S, B, X, U>
|
||||
where
|
||||
S: Service<Request = Request>,
|
||||
|
@ -618,15 +618,13 @@ where
|
|||
{
|
||||
type Output = Result<(), DispatchError>;
|
||||
|
||||
#[pin_project::project]
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.as_mut().project();
|
||||
|
||||
#[project]
|
||||
match this.state.project() {
|
||||
State::H1(disp) => disp.poll(cx),
|
||||
State::H2(ref mut disp) => Pin::new(disp).poll(cx),
|
||||
State::H2Handshake(ref mut data) => {
|
||||
StateProject::H1(disp) => disp.poll(cx),
|
||||
StateProject::H2(ref mut disp) => Pin::new(disp).poll(cx),
|
||||
StateProject::H2Handshake(ref mut data) => {
|
||||
let conn = if let Some(ref mut item) = data {
|
||||
match Pin::new(&mut item.0).poll(cx) {
|
||||
Poll::Ready(Ok(conn)) => conn,
|
||||
|
|
|
@ -177,7 +177,7 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
|
|||
pub mod $mod_name {
|
||||
use super::*;
|
||||
|
||||
#[pin_project::pin_project]
|
||||
#[pin_project::pin_project(project = ServiceResponseProject)]
|
||||
pub enum ServiceResponse<A: Future, $($T: Future),+> {
|
||||
V1(#[pin] A),
|
||||
$($T(#[pin] $T),)+
|
||||
|
@ -190,12 +190,10 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
|
|||
{
|
||||
type Output = A::Output;
|
||||
|
||||
#[pin_project::project]
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
#[project]
|
||||
match self.project() {
|
||||
ServiceResponse::V1(fut) => fut.poll(cx),
|
||||
$(ServiceResponse::$T(fut) => fut.poll(cx),)+
|
||||
ServiceResponseProject::V1(fut) => fut.poll(cx),
|
||||
$(ServiceResponseProject::$T(fut) => fut.poll(cx),)+
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue