Update deps (#228)

* Update bitflags crate

* Upgrade ntex-h2
This commit is contained in:
Nikolay Kim 2023-10-09 20:59:04 +06:00 committed by GitHub
parent e3eeaee542
commit 8c3e8ebfd9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 176 additions and 287 deletions

View file

@ -18,7 +18,7 @@ default = []
simd = ["simdutf8"] simd = ["simdutf8"]
[dependencies] [dependencies]
bitflags = "1.3" bitflags = "2.4"
bytes = "1.0.0" bytes = "1.0.0"
serde = "1.0.0" serde = "1.0.0"
futures-core = { version = "0.3", default-features = false, features = ["alloc"] } futures-core = { version = "0.3", default-features = false, features = ["alloc"] }

View file

@ -979,7 +979,7 @@ impl PartialEq for Bytes {
impl PartialOrd for Bytes { impl PartialOrd for Bytes {
fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> { fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
self.inner.as_ref().partial_cmp(other.inner.as_ref()) Some(self.cmp(other))
} }
} }

View file

@ -26,6 +26,7 @@ pub struct BufParams {
} }
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 { struct Flags: u8 {
const SPAWNED = 0b0000_0001; const SPAWNED = 0b0000_0001;
const INCREASED = 0b0000_0010; const INCREASED = 0b0000_0010;
@ -116,8 +117,7 @@ impl PoolId {
where where
T: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static, T: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static,
{ {
let spawn: Rc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>)> = let spawn: Rc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>)> = Rc::new(f);
Rc::new(move |fut| f(fut));
POOLS.with(move |pools| { POOLS.with(move |pools| {
*pools[self.0 as usize].spawn.borrow_mut() = Some(spawn.clone()); *pools[self.0 as usize].spawn.borrow_mut() = Some(spawn.clone());
@ -131,8 +131,7 @@ impl PoolId {
where where
T: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static, T: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static,
{ {
let spawn: Rc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>)> = let spawn: Rc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>)> = Rc::new(f);
Rc::new(move |fut| f(fut));
POOLS.with(move |pools| { POOLS.with(move |pools| {
for pool in pools.iter().take(15) { for pool in pools.iter().take(15) {

View file

@ -545,7 +545,7 @@ impl PartialEq for HeaderValue {
impl PartialOrd for HeaderValue { impl PartialOrd for HeaderValue {
#[inline] #[inline]
fn partial_cmp(&self, other: &HeaderValue) -> Option<cmp::Ordering> { fn partial_cmp(&self, other: &HeaderValue) -> Option<cmp::Ordering> {
self.inner.partial_cmp(&other.inner) Some(self.cmp(other))
} }
} }

View file

@ -21,7 +21,7 @@ ntex-bytes = "0.1.19"
ntex-util = "0.3.2" ntex-util = "0.3.2"
ntex-service = "1.2.6" ntex-service = "1.2.6"
bitflags = "1.3" bitflags = "2.4"
log = "0.4" log = "0.4"
pin-project-lite = "0.2" pin-project-lite = "0.2"

View file

@ -25,6 +25,7 @@ pin_project_lite::pin_project! {
} }
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 { struct Flags: u8 {
const READY_ERR = 0b0001; const READY_ERR = 0b0001;
const IO_ERR = 0b0010; const IO_ERR = 0b0010;

View file

@ -14,6 +14,7 @@ use crate::tasks::{ReadContext, WriteContext};
use crate::{FilterLayer, Handle, IoStatusUpdate, IoStream, RecvError}; use crate::{FilterLayer, Handle, IoStatusUpdate, IoStream, RecvError};
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Flags: u16 { pub struct Flags: u16 {
/// io is closed /// io is closed
const IO_STOPPED = 0b0000_0000_0000_0001; const IO_STOPPED = 0b0000_0000_0000_0001;

View file

@ -37,6 +37,7 @@ pub struct IoTest {
} }
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct IoTestFlags: u8 { struct IoTestFlags: u8 {
const FLUSHED = 0b0000_0001; const FLUSHED = 0b0000_0001;
const CLOSED = 0b0000_0010; const CLOSED = 0b0000_0010;
@ -565,7 +566,7 @@ impl Future for WriteTask {
log::trace!("write task is stopped"); log::trace!("write task is stopped");
return Poll::Ready(()); return Poll::Ready(());
} }
Poll::Ready(Ok(n)) if n == 0 => { Poll::Ready(Ok(0)) => {
this.state.close(None); this.state.close(None);
log::trace!("write task is stopped"); log::trace!("write task is stopped");
return Poll::Ready(()); return Poll::Ready(());

View file

@ -38,7 +38,7 @@ pub(crate) fn register(timeout: Duration, io: &IoRef) -> Instant {
inner inner
.notifications .notifications
.entry(expire) .entry(expire)
.or_insert_with(HashSet::default) .or_default()
.insert(io.0.clone()); .insert(io.0.clone());
if !inner.running { if !inner.running {

View file

@ -18,7 +18,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
ntex-rt = "0.4.7" ntex-rt = "0.4.7"
ntex-service = "1.2.6" ntex-service = "1.2.6"
bitflags = "1.3" bitflags = "2.4"
fxhash = "0.2.1" fxhash = "0.2.1"
log = "0.4" log = "0.4"
slab = "0.4" slab = "0.4"

View file

@ -26,6 +26,7 @@ impl<T> fmt::Debug for Pool<T> {
} }
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 { struct Flags: u8 {
const SENDER = 0b0000_0001; const SENDER = 0b0000_0001;
const RECEIVER = 0b0000_0010; const RECEIVER = 0b0000_0010;

View file

@ -131,6 +131,7 @@ impl Drop for TimerHandle {
} }
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Flags: u8 { pub struct Flags: u8 {
const DRIVER_STARTED = 0b0000_0001; const DRIVER_STARTED = 0b0000_0001;
const DRIVER_RECALC = 0b0000_0010; const DRIVER_RECALC = 0b0000_0010;

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.7.6] - 2023-10-xx
* Upgrade ntex-h2 to 0.4
## [0.7.5] - 2023-10-01 ## [0.7.5] - 2023-10-01
* Fix compile error for 'compress' feature with async-std & glommio #226 * Fix compile error for 'compress' feature with async-std & glommio #226

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "0.7.5" version = "0.7.6"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"
@ -56,7 +56,7 @@ ntex-service = "1.2.6"
ntex-macros = "0.1.3" ntex-macros = "0.1.3"
ntex-util = "0.3.2" ntex-util = "0.3.2"
ntex-bytes = "0.1.19" ntex-bytes = "0.1.19"
ntex-h2 = "0.3.2" ntex-h2 = "0.4.2"
ntex-rt = "0.4.9" ntex-rt = "0.4.9"
ntex-io = "0.3.4" ntex-io = "0.3.4"
ntex-tls = "0.3.1" ntex-tls = "0.3.1"
@ -67,7 +67,7 @@ ntex-async-std = { version = "0.3.0", optional = true }
async-oneshot = "0.5.0" async-oneshot = "0.5.0"
async-channel = "1.8.0" async-channel = "1.8.0"
base64 = "0.21" base64 = "0.21"
bitflags = "1.3" bitflags = "2.4"
log = "0.4" log = "0.4"
num_cpus = "1.13" num_cpus = "1.13"
nanorand = { version = "0.7.0", default-features = false, features = ["std", "wyrand"] } nanorand = { version = "0.7.0", default-features = false, features = ["std", "wyrand"] }

View file

@ -1,18 +1,17 @@
use std::{cell::RefCell, io, rc::Rc}; use std::io;
use ntex_h2::{self as h2, client::Client, frame}; use ntex_h2::{self as h2, client::SimpleClient, frame};
use crate::http::body::{BodySize, MessageBody}; use crate::http::body::{BodySize, MessageBody};
use crate::http::header::{self, HeaderMap, HeaderValue}; use crate::http::header::{self, HeaderMap, HeaderValue};
use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::{h2::payload, payload::Payload, Method, Version}; use crate::http::{h2::payload, payload::Payload, Method, Version};
use crate::util::{poll_fn, ByteString, Bytes, HashMap, Ready}; use crate::util::{poll_fn, ByteString, Bytes};
use crate::{channel::oneshot, Service, ServiceCtx};
use super::error::SendRequestError; use super::error::{ConnectError, SendRequestError};
pub(super) async fn send_request<B>( pub(super) async fn send_request<B>(
mut client: H2Client, client: H2Client,
head: RequestHeadType, head: RequestHeadType,
body: B, body: B,
) -> Result<(ResponseHead, Payload), SendRequestError> ) -> Result<(ResponseHead, Payload), SendRequestError>
@ -69,36 +68,129 @@ where
.path_and_query() .path_and_query()
.map(|p| ByteString::from(format!("{}", p))) .map(|p| ByteString::from(format!("{}", p)))
.unwrap_or_else(|| ByteString::from(uri.path())); .unwrap_or_else(|| ByteString::from(uri.path()));
let stream = client let (snd_stream, rcv_stream) = client
.inner
.client .client
.send_request(head.as_ref().method.clone(), path, hdrs, eof) .send(head.as_ref().method.clone(), path, hdrs, eof)
.await?; .await?;
// send body // send body
let id = stream.id(); if !eof {
if eof {
let result = client.wait_response(id).await;
client.set_stream(stream);
result
} else {
// sending body is async process, we can handle upload and download // sending body is async process, we can handle upload and download
// at the same time // at the same time
let c = client.clone();
crate::rt::spawn(async move { crate::rt::spawn(async move {
if let Err(e) = send_body(body, &stream).await { if let Err(e) = send_body(body, &snd_stream).await {
c.set_error(stream.id(), e); log::error!("Cannot send body: {:?}", e);
} else { snd_stream.reset(frame::Reason::INTERNAL_ERROR);
c.set_stream(stream);
} }
}); });
client.wait_response(id).await }
let h2::Message { stream, kind } = rcv_stream
.recv()
.await
.ok_or(SendRequestError::Connect(ConnectError::Disconnected(None)))?;
match kind {
h2::MessageKind::Headers {
pseudo,
headers,
eof,
} => {
log::trace!(
"{:?} got response (eof: {}): {:#?}\nheaders: {:#?}",
stream.id(),
eof,
pseudo,
headers
);
match pseudo.status {
Some(status) => {
let mut head = ResponseHead::new(status);
head.headers = headers;
head.version = Version::HTTP_2;
let payload = if !eof {
log::debug!("Creating local payload stream for {:?}", stream.id());
let (mut pl, payload) =
payload::Payload::create(stream.empty_capacity());
crate::rt::spawn(async move {
loop {
let h2::Message { stream, kind } =
match rcv_stream.recv().await {
Some(msg) => msg,
None => {
pl.feed_eof(Bytes::new());
break;
}
};
match kind {
h2::MessageKind::Data(data, cap) => {
log::debug!(
"Got data chunk for {:?}: {:?}",
stream.id(),
data.len()
);
pl.feed_data(data, cap);
}
h2::MessageKind::Eof(item) => {
log::debug!(
"Got payload eof for {:?}: {:?}",
stream.id(),
item
);
match item {
h2::StreamEof::Data(data) => {
pl.feed_eof(data);
}
h2::StreamEof::Trailers(_) => {
pl.feed_eof(Bytes::new());
}
h2::StreamEof::Error(err) => {
pl.set_error(err.into())
}
}
}
h2::MessageKind::Disconnect(err) => {
log::debug!("Connection is disconnected {:?}", err);
pl.set_error(
io::Error::new(io::ErrorKind::Other, err)
.into(),
);
}
_ => {
pl.set_error(
io::Error::new(
io::ErrorKind::Other,
"unexpected h2 message",
)
.into(),
);
break;
}
}
}
});
Payload::H2(payload)
} else {
Payload::None
};
Ok((head, payload))
}
None => Err(SendRequestError::H2(h2::OperationError::Connection(
h2::ConnectionError::MissingPseudo("Status"),
))),
}
}
_ => Err(SendRequestError::Error(Box::new(io::Error::new(
io::ErrorKind::Other,
"unexpected h2 message",
)))),
} }
} }
async fn send_body<B: MessageBody>( async fn send_body<B: MessageBody>(
mut body: B, mut body: B,
stream: &h2::Stream, stream: &h2::client::SendStream,
) -> Result<(), SendRequestError> { ) -> Result<(), SendRequestError> {
loop { loop {
match poll_fn(|cx| body.poll_next_chunk(cx)).await { match poll_fn(|cx| body.poll_next_chunk(cx)).await {
@ -116,214 +208,21 @@ async fn send_body<B: MessageBody>(
} }
} }
#[derive(Clone)]
pub(super) struct H2Client { pub(super) struct H2Client {
inner: Rc<H2ClientInner>, client: SimpleClient,
wait_id: Option<frame::StreamId>,
}
impl Clone for H2Client {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
wait_id: None,
}
}
} }
impl H2Client { impl H2Client {
pub(super) fn new(client: Client) -> Self { pub(super) fn new(client: SimpleClient) -> Self {
Self { Self { client }
wait_id: None,
inner: Rc::new(H2ClientInner {
client,
streams: RefCell::new(HashMap::default()),
}),
}
} }
pub(super) fn close(&self) { pub(super) fn close(&self) {
self.inner.client.close() self.client.close()
} }
pub(super) fn is_closed(&self) -> bool { pub(super) fn is_closed(&self) -> bool {
self.inner.client.is_closed() self.client.is_closed()
}
fn set_error(&self, id: frame::StreamId, err: SendRequestError) {
if let Some(mut info) = self.inner.streams.borrow_mut().remove(&id) {
if let Some(tx) = info.tx.take() {
let _ = tx.send(Err(err));
}
}
}
fn set_stream(&self, stream: h2::Stream) {
if let Some(info) = self.inner.streams.borrow_mut().get_mut(&stream.id()) {
// response is not received yet
if info.tx.is_some() {
info.stream = Some(stream);
} else if let Some(ref mut sender) = info.payload {
sender.set_stream(Some(stream));
}
}
}
async fn wait_response(
&mut self,
id: frame::StreamId,
) -> Result<(ResponseHead, Payload), SendRequestError> {
let (tx, rx) = oneshot::channel();
let info = StreamInfo {
tx: Some(tx),
stream: None,
payload: None,
};
self.wait_id = Some(id);
self.inner.streams.borrow_mut().insert(id, info);
let result = match rx.await {
Ok(item) => item,
Err(_) => Err(SendRequestError::Error(Box::new(io::Error::new(
io::ErrorKind::Other,
"disconnected",
)))),
};
self.wait_id = None;
result
}
}
impl Drop for H2Client {
fn drop(&mut self) {
if let Some(id) = self.wait_id.take() {
self.inner.streams.borrow_mut().remove(&id);
}
}
}
struct H2ClientInner {
client: Client,
streams: RefCell<HashMap<frame::StreamId, StreamInfo>>,
}
#[derive(Debug)]
struct StreamInfo {
tx: Option<oneshot::Sender<Result<(ResponseHead, Payload), SendRequestError>>>,
stream: Option<h2::Stream>,
payload: Option<payload::PayloadSender>,
}
pub(super) struct H2PublishService(Rc<H2ClientInner>);
impl H2PublishService {
pub(super) fn new(client: H2Client) -> Self {
Self(client.inner.clone())
}
}
impl Service<h2::Message> for H2PublishService {
type Response = ();
type Error = &'static str;
type Future<'f> = Ready<Self::Response, Self::Error>;
fn call<'a>(
&'a self,
mut msg: h2::Message,
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
match msg.kind().take() {
h2::MessageKind::Headers {
pseudo,
headers,
eof,
} => {
log::trace!(
"{:?} got response (eof: {}): {:#?}\nheaders: {:#?}",
msg.id(),
eof,
pseudo,
headers
);
let status = match pseudo.status {
Some(status) => status,
None => {
if let Some(mut info) =
self.0.streams.borrow_mut().remove(&msg.id())
{
let _ = info.tx.take().unwrap().send(Err(
SendRequestError::H2(h2::OperationError::Connection(
h2::ConnectionError::MissingPseudo("Status"),
)),
));
}
return Ready::Err("Missing status header");
}
};
let mut head = ResponseHead::new(status);
head.headers = headers;
head.version = Version::HTTP_2;
if let Some(info) = self.0.streams.borrow_mut().get_mut(&msg.id()) {
let stream = info.stream.take();
let payload = if !eof {
log::debug!("Creating local payload stream for {:?}", msg.id());
let (sender, payload) =
payload::Payload::create(msg.stream().empty_capacity());
sender.set_stream(stream);
info.payload = Some(sender);
Payload::H2(payload)
} else {
Payload::None
};
let _ = info.tx.take().unwrap().send(Ok((head, payload)));
Ready::Ok(())
} else {
Ready::Err("Cannot find Stream info")
}
}
h2::MessageKind::Data(data, cap) => {
log::debug!("Got data chunk for {:?}: {:?}", msg.id(), data.len());
if let Some(info) = self.0.streams.borrow_mut().get_mut(&msg.id()) {
if let Some(ref mut pl) = info.payload {
pl.feed_data(data, cap);
}
Ready::Ok(())
} else {
log::error!("Payload stream does not exists for {:?}", msg.id());
Ready::Err("Cannot find Stream info")
}
}
h2::MessageKind::Eof(item) => {
log::debug!("Got payload eof for {:?}: {:?}", msg.id(), item);
if let Some(mut info) = self.0.streams.borrow_mut().remove(&msg.id()) {
if let Some(ref mut pl) = info.payload {
match item {
h2::StreamEof::Data(data) => {
pl.feed_eof(data);
}
h2::StreamEof::Trailers(_) => {
pl.feed_eof(Bytes::new());
}
h2::StreamEof::Error(err) => pl.set_error(err.into()),
}
}
Ready::Ok(())
} else {
Ready::Err("Cannot find Stream info")
}
}
h2::MessageKind::Disconnect(err) => {
log::debug!("Connection is disconnected {:?}", err);
if let Some(mut info) = self.0.streams.borrow_mut().remove(&msg.id()) {
if let Some(ref mut pl) = info.payload {
pl.set_error(io::Error::new(io::ErrorKind::Other, err).into());
}
}
Ready::Ok(())
}
h2::MessageKind::Empty => Ready::Ok(()),
}
} }
} }

View file

@ -12,8 +12,7 @@ use crate::util::{ready, BoxFuture, ByteString, HashMap, HashSet};
use crate::{channel::pool, rt::spawn, task::LocalWaker}; use crate::{channel::pool, rt::spawn, task::LocalWaker};
use super::connection::{Connection, ConnectionType}; use super::connection::{Connection, ConnectionType};
use super::h2proto::{H2Client, H2PublishService}; use super::{error::ConnectError, h2proto::H2Client, Connect};
use super::{error::ConnectError, Connect};
#[derive(Hash, Eq, PartialEq, Clone, Debug)] #[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub(super) struct Key { pub(super) struct Key {
@ -202,7 +201,7 @@ impl Waiters {
let key: Key = connect.uri.authority().unwrap().clone().into(); let key: Key = connect.uri.authority().unwrap().clone().into();
self.waiters self.waiters
.entry(key) .entry(key)
.or_insert_with(VecDeque::new) .or_default()
.push_back((connect, tx)); .push_back((connect, tx));
rx rx
} }
@ -472,24 +471,13 @@ where
ByteString::new() ByteString::new()
}; };
let connection = h2::client::ClientConnection::with_params( let client = h2::client::SimpleClient::new(
io, io,
this.inner.borrow().h2config.clone(), this.inner.borrow().h2config.clone(),
this.uri.scheme() == Some(&Scheme::HTTPS), this.uri.scheme().cloned().unwrap_or(Scheme::HTTPS),
auth, auth,
); );
let client = H2Client::new(connection.client()); let client = H2Client::new(client);
let key = this.key.clone();
let publish = H2PublishService::new(client.clone());
crate::rt::spawn(async move {
let res = connection.start(publish).await;
log::trace!(
"Http/2 connection is closed for {:?} with {:?}",
key.authority,
res
);
});
let guard = this.guard.take().unwrap().consume(); let guard = this.guard.take().unwrap().consume();
let conn = Connection::new( let conn = Connection::new(
ConnectionType::H2(client.clone()), ConnectionType::H2(client.clone()),

View file

@ -14,6 +14,7 @@ use super::decoder::{PayloadDecoder, PayloadItem, PayloadType};
use super::{decoder, encoder, reserve_readbuf, Message, MessageType}; use super::{decoder, encoder, reserve_readbuf, Message, MessageType};
bitflags! { bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 { struct Flags: u8 {
const HEAD = 0b0000_0001; const HEAD = 0b0000_0001;
const KEEPALIVE_ENABLED = 0b0000_1000; const KEEPALIVE_ENABLED = 0b0000_1000;

View file

@ -15,6 +15,7 @@ use crate::util::BytesMut;
use super::{decoder, decoder::PayloadType, encoder, Message}; use super::{decoder, decoder::PayloadType, encoder, Message};
bitflags! { bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 { struct Flags: u8 {
const HEAD = 0b0000_0001; const HEAD = 0b0000_0001;
const STREAM = 0b0000_0010; const STREAM = 0b0000_0010;

View file

@ -19,6 +19,7 @@ use super::payload::{Payload, PayloadSender, PayloadStatus};
use super::{codec::Codec, Message}; use super::{codec::Codec, Message};
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Flags: u16 { pub struct Flags: u16 {
/// We parsed one complete request message /// We parsed one complete request message
const STARTED = 0b0000_0001; const STARTED = 0b0000_0001;

View file

@ -278,21 +278,18 @@ where
Ready<Self::Response, Self::Error>, Ready<Self::Response, Self::Error>,
>; >;
fn call<'a>( fn call<'a>(&'a self, msg: h2::Message, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
&'a self, let h2::Message { stream, kind } = msg;
mut msg: h2::Message, let (io, pseudo, headers, eof, payload) = match kind {
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
let (io, pseudo, headers, eof, payload) = match msg.kind().take() {
h2::MessageKind::Headers { h2::MessageKind::Headers {
pseudo, pseudo,
headers, headers,
eof, eof,
} => { } => {
let pl = if !eof { let pl = if !eof {
log::debug!("Creating local payload stream for {:?}", msg.id()); log::debug!("Creating local payload stream for {:?}", stream.id());
let (sender, payload) = Payload::create(msg.stream().empty_capacity()); let (sender, payload) = Payload::create(stream.empty_capacity());
self.streams.borrow_mut().insert(msg.id(), sender); self.streams.borrow_mut().insert(stream.id(), sender);
Some(payload) Some(payload)
} else { } else {
None None
@ -300,17 +297,17 @@ where
(self.io.clone(), pseudo, headers, eof, pl) (self.io.clone(), pseudo, headers, eof, pl)
} }
h2::MessageKind::Data(data, cap) => { h2::MessageKind::Data(data, cap) => {
log::debug!("Got data chunk for {:?}: {:?}", msg.id(), data.len()); log::debug!("Got data chunk for {:?}: {:?}", stream.id(), data.len());
if let Some(sender) = self.streams.borrow_mut().get_mut(&msg.id()) { if let Some(sender) = self.streams.borrow_mut().get_mut(&stream.id()) {
sender.feed_data(data, cap) sender.feed_data(data, cap)
} else { } else {
log::error!("Payload stream does not exists for {:?}", msg.id()); log::error!("Payload stream does not exists for {:?}", stream.id());
}; };
return Either::Right(Ready::Ok(())); return Either::Right(Ready::Ok(()));
} }
h2::MessageKind::Eof(item) => { h2::MessageKind::Eof(item) => {
log::debug!("Got payload eof for {:?}: {:?}", msg.id(), item); log::debug!("Got payload eof for {:?}: {:?}", stream.id(), item);
if let Some(mut sender) = self.streams.borrow_mut().remove(&msg.id()) { if let Some(mut sender) = self.streams.borrow_mut().remove(&stream.id()) {
match item { match item {
h2::StreamEof::Data(data) => { h2::StreamEof::Data(data) => {
sender.feed_eof(data); sender.feed_eof(data);
@ -325,12 +322,11 @@ where
} }
h2::MessageKind::Disconnect(err) => { h2::MessageKind::Disconnect(err) => {
log::debug!("Connection is disconnected {:?}", err); log::debug!("Connection is disconnected {:?}", err);
if let Some(mut sender) = self.streams.borrow_mut().remove(&msg.id()) { if let Some(mut sender) = self.streams.borrow_mut().remove(&stream.id()) {
sender.set_error(io::Error::new(io::ErrorKind::Other, err).into()); sender.set_error(io::Error::new(io::ErrorKind::Other, err).into());
} }
return Either::Right(Ready::Ok(())); return Either::Right(Ready::Ok(()));
} }
h2::MessageKind::Empty => return Either::Right(Ready::Ok(())),
}; };
let cfg = self.config.clone(); let cfg = self.config.clone();
@ -338,7 +334,7 @@ where
Either::Left(Box::pin(async move { Either::Left(Box::pin(async move {
log::trace!( log::trace!(
"{:?} got request (eof: {}): {:#?}\nheaders: {:#?}", "{:?} got request (eof: {}): {:#?}\nheaders: {:#?}",
msg.id(), stream.id(),
eof, eof,
pseudo, pseudo,
headers headers
@ -381,25 +377,25 @@ where
let hdrs = mem::replace(&mut head.headers, HeaderMap::new()); let hdrs = mem::replace(&mut head.headers, HeaderMap::new());
if size.is_eof() || is_head_req { if size.is_eof() || is_head_req {
msg.stream().send_response(head.status, hdrs, true)?; stream.send_response(head.status, hdrs, true)?;
} else { } else {
msg.stream().send_response(head.status, hdrs, false)?; stream.send_response(head.status, hdrs, false)?;
loop { loop {
match poll_fn(|cx| body.poll_next_chunk(cx)).await { match poll_fn(|cx| body.poll_next_chunk(cx)).await {
None => { None => {
log::debug!("{:?} closing payload stream", msg.id()); log::debug!("{:?} closing payload stream", stream.id());
msg.stream().send_payload(Bytes::new(), true).await?; stream.send_payload(Bytes::new(), true).await?;
break; break;
} }
Some(Ok(chunk)) => { Some(Ok(chunk)) => {
log::debug!( log::debug!(
"{:?} sending data chunk {:?} bytes", "{:?} sending data chunk {:?} bytes",
msg.id(), stream.id(),
chunk.len() chunk.len()
); );
if !chunk.is_empty() { if !chunk.is_empty() {
msg.stream().send_payload(chunk, false).await?; stream.send_payload(chunk, false).await?;
} }
} }
Some(Err(e)) => { Some(Err(e)) => {

View file

@ -19,6 +19,7 @@ pub enum ConnectionType {
} }
bitflags! { bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub(crate) struct Flags: u8 { pub(crate) struct Flags: u8 {
const CLOSE = 0b0000_0001; const CLOSE = 0b0000_0001;
const KEEP_ALIVE = 0b0000_0010; const KEEP_ALIVE = 0b0000_0010;

View file

@ -9,7 +9,7 @@
#![warn( #![warn(
rust_2018_idioms, rust_2018_idioms,
unreachable_pub, unreachable_pub,
missing_debug_implementations, // missing_debug_implementations,
// missing_docs, // missing_docs,
)] )]
#![allow( #![allow(

View file

@ -105,11 +105,7 @@ where
let filter_fut = self.filter.create(()); let filter_fut = self.filter.create(());
let state_factories = self.state_factories.clone(); let state_factories = self.state_factories.clone();
let mut extensions = self let mut extensions = self.extensions.borrow_mut().take().unwrap_or_default();
.extensions
.borrow_mut()
.take()
.unwrap_or_else(Extensions::new);
let middleware = self.middleware.clone(); let middleware = self.middleware.clone();
let external = std::mem::take(&mut *self.external.borrow_mut()); let external = std::mem::take(&mut *self.external.borrow_mut());

View file

@ -204,7 +204,7 @@ where
self.external.extend(cfg.external); self.external.extend(cfg.external);
if !cfg.state.is_empty() { if !cfg.state.is_empty() {
let mut state = self.state.unwrap_or_else(Extensions::new); let mut state = self.state.unwrap_or_default();
state.extend(cfg.state); state.extend(cfg.state);
self.state = Some(state); self.state = Some(state);
} }

View file

@ -137,11 +137,7 @@ where
let server_mode = self.server_mode; let server_mode = self.server_mode;
let to = self.timeout; let to = self.timeout;
let keepalive_timeout = self.keepalive_timeout; let keepalive_timeout = self.keepalive_timeout;
let mut headers = self let mut headers = self.extra_headers.borrow_mut().take().unwrap_or_default();
.extra_headers
.borrow_mut()
.take()
.unwrap_or_else(HeaderMap::new);
// Generate a random key for the `Sec-WebSocket-Key` header. // Generate a random key for the `Sec-WebSocket-Key` header.
// a base64-encoded (see Section 4 of [RFC4648]) value that, // a base64-encoded (see Section 4 of [RFC4648]) value that,

View file

@ -58,6 +58,7 @@ pub struct Codec {
} }
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 { struct Flags: u8 {
const SERVER = 0b0000_0001; const SERVER = 0b0000_0001;
const R_CONTINUATION = 0b0000_0010; const R_CONTINUATION = 0b0000_0010;

View file

@ -8,6 +8,7 @@ use crate::util::{BufMut, PoolRef, Ready};
use super::{CloseCode, CloseReason, Codec, Frame, Item, Message}; use super::{CloseCode, CloseReason, Codec, Frame, Item, Message};
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 { struct Flags: u8 {
const CLOSED = 0b0001; const CLOSED = 0b0001;
const CONTINUATION = 0b0010; const CONTINUATION = 0b0010;