change recv/poll_recv api

This commit is contained in:
Nikolay Kim 2021-12-22 02:13:02 +06:00
parent 2e7547948e
commit fd97208a01
17 changed files with 119 additions and 168 deletions

View file

@ -221,21 +221,21 @@ where
Poll::Ready(()) => { Poll::Ready(()) => {
// decode incoming bytes if buffer is ready // decode incoming bytes if buffer is ready
match io.poll_recv(&slf.shared.codec, cx) { match io.poll_recv(&slf.shared.codec, cx) {
Poll::Ready(Some(Ok(el))) => { Poll::Ready(Ok(Some(el))) => {
slf.update_keepalive(); slf.update_keepalive();
DispatchItem::Item(el) DispatchItem::Item(el)
} }
Poll::Ready(Some(Err(Either::Left(err)))) => { Poll::Ready(Err(Either::Left(err))) => {
slf.st.set(DispatcherState::Stop); slf.st.set(DispatcherState::Stop);
slf.unregister_keepalive(); slf.unregister_keepalive();
DispatchItem::DecoderError(err) DispatchItem::DecoderError(err)
} }
Poll::Ready(Some(Err(Either::Right(err)))) => { Poll::Ready(Err(Either::Right(err))) => {
slf.st.set(DispatcherState::Stop); slf.st.set(DispatcherState::Stop);
slf.unregister_keepalive(); slf.unregister_keepalive();
DispatchItem::Disconnect(Some(err)) DispatchItem::Disconnect(Some(err))
} }
Poll::Ready(None) => { Poll::Ready(Ok(None)) => {
DispatchItem::Disconnect(None) DispatchItem::Disconnect(None)
} }
Poll::Pending => { Poll::Pending => {

View file

@ -410,7 +410,7 @@ impl<F> Io<F> {
pub async fn recv<U>( pub async fn recv<U>(
&self, &self,
codec: &U, codec: &U,
) -> Option<Result<U::Item, Either<U::Error, io::Error>>> ) -> Result<Option<U::Item>, Either<U::Error, io::Error>>
where where
U: Decoder, U: Decoder,
{ {
@ -418,8 +418,8 @@ impl<F> Io<F> {
} }
#[inline] #[inline]
/// Wake read task and instruct to read more data /// Wait until read becomes ready.
pub async fn read_ready(&self) -> Option<io::Result<()>> { pub async fn read_ready(&self) -> io::Result<Option<()>> {
poll_fn(|cx| self.poll_read_ready(cx)).await poll_fn(|cx| self.poll_read_ready(cx)).await
} }
@ -442,8 +442,8 @@ impl<F> Io<F> {
/// Encode item, send to a peer /// Encode item, send to a peer
pub async fn send<U>( pub async fn send<U>(
&self, &self,
item: U::Item,
codec: &U, codec: &U,
item: U::Item,
) -> Result<(), Either<U::Error, io::Error>> ) -> Result<(), Either<U::Error, io::Error>>
where where
U: Encoder, U: Encoder,
@ -470,22 +470,31 @@ impl<F> Io<F> {
} }
#[inline] #[inline]
/// Shut down connection /// Shut down io stream
pub async fn shutdown(&self) -> Result<(), io::Error> { pub async fn shutdown(&self) -> Result<(), io::Error> {
poll_fn(|cx| self.poll_shutdown(cx)).await poll_fn(|cx| self.poll_shutdown(cx)).await
} }
#[inline] #[inline]
/// Wake read task and instruct to read more data /// Polls for read readiness.
/// ///
/// Read task is awake only if back-pressure is enabled /// If the io stream is not currently ready for reading,
/// otherwise it is already awake. Buffer read status gets clean up. /// this method will store a clone of the Waker from the provided Context.
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Option<io::Result<()>>> { /// When the io stream becomes ready for reading, Waker::wake will be called on the waker.
///
/// Return value
/// The function returns:
///
/// `Poll::Pending` if the io stream is not ready for reading.
/// `Poll::Ready(Ok(Some(()))))` if the io stream is ready for reading.
/// `Poll::Ready(Ok(None))` if io stream is disconnected
/// `Some(Poll::Ready(Err(e)))` if an error is encountered.
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Option<()>>> {
if !self.0 .0.is_io_open() { if !self.0 .0.is_io_open() {
if let Some(err) = self.0 .0.error.take() { if let Some(err) = self.0 .0.error.take() {
Poll::Ready(Some(Err(err))) Poll::Ready(Err(err))
} else { } else {
Poll::Ready(None) Poll::Ready(Ok(None))
} }
} else { } else {
self.0 .0.dispatch_task.register(cx.waker()); self.0 .0.dispatch_task.register(cx.waker());
@ -498,7 +507,7 @@ impl<F> Io<F> {
self.0 .0.read_task.wake(); self.0 .0.read_task.wake();
self.0 .0.flags.set(flags); self.0 .0.flags.set(flags);
if ready { if ready {
Poll::Ready(Some(Ok(()))) Poll::Ready(Ok(Some(())))
} else { } else {
Poll::Pending Poll::Pending
} }
@ -507,7 +516,7 @@ impl<F> Io<F> {
flags.remove(Flags::RD_READY); flags.remove(Flags::RD_READY);
self.0 .0.flags.set(flags); self.0 .0.flags.set(flags);
self.0 .0.read_task.wake(); self.0 .0.read_task.wake();
Poll::Ready(Some(Ok(()))) Poll::Ready(Ok(Some(())))
} else { } else {
Poll::Pending Poll::Pending
} }
@ -523,18 +532,18 @@ impl<F> Io<F> {
&self, &self,
codec: &U, codec: &U,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<U::Item, Either<U::Error, io::Error>>>> ) -> Poll<Result<Option<U::Item>, Either<U::Error, io::Error>>>
where where
U: Decoder, U: Decoder,
{ {
match self.decode(codec) { match self.decode(codec) {
Ok(Some(el)) => Poll::Ready(Some(Ok(el))), Ok(Some(el)) => Poll::Ready(Ok(Some(el))),
Ok(None) => match self.poll_read_ready(cx) { Ok(None) => match self.poll_read_ready(cx) {
Poll::Pending | Poll::Ready(Some(Ok(()))) => Poll::Pending, Poll::Pending | Poll::Ready(Ok(Some(()))) => Poll::Pending,
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Either::Right(e)))), Poll::Ready(Err(e)) => Poll::Ready(Err(Either::Right(e))),
Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Ok(None)) => Poll::Ready(Ok(None)),
}, },
Err(err) => Poll::Ready(Some(Err(Either::Left(err)))), Err(err) => Poll::Ready(Err(Either::Left(err))),
} }
} }
@ -602,7 +611,7 @@ impl<F> Io<F> {
} }
#[inline] #[inline]
/// Shut down connection /// Shut down io stream
pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let flags = self.flags(); let flags = self.flags();
@ -621,61 +630,6 @@ impl<F> Io<F> {
} }
} }
} }
#[doc(hidden)]
#[deprecated]
#[inline]
pub async fn next<U>(
&self,
codec: &U,
) -> Option<Result<U::Item, Either<U::Error, io::Error>>>
where
U: Decoder,
{
self.recv(codec).await
}
#[doc(hidden)]
#[deprecated]
#[inline]
pub async fn write_ready(&self, full: bool) -> Result<(), io::Error> {
poll_fn(|cx| self.poll_flush(cx, full)).await
}
#[doc(hidden)]
#[deprecated]
#[inline]
pub fn poll_write_ready(
&self,
cx: &mut Context<'_>,
full: bool,
) -> Poll<io::Result<()>> {
self.poll_flush(cx, full)
}
#[doc(hidden)]
#[deprecated]
#[inline]
#[allow(clippy::type_complexity)]
pub fn poll_read_next<U>(
&self,
codec: &U,
cx: &mut Context<'_>,
) -> Poll<Option<Result<U::Item, Either<U::Error, io::Error>>>>
where
U: Decoder,
{
self.poll_recv(codec, cx)
}
#[doc(hidden)]
#[deprecated]
#[inline]
pub fn enable_write_backpressure(&self, cx: &mut Context<'_>) {
log::trace!("enable write back-pressure for dispatcher");
self.0 .0.insert_flags(Flags::WR_BACKPRESSURE);
self.0 .0.dispatch_task.register(cx.waker());
}
} }
impl<F> Drop for Io<F> { impl<F> Drop for Io<F> {

View file

@ -321,7 +321,7 @@ mod tests {
client.read_error(io::Error::new(io::ErrorKind::Other, "err")); client.read_error(io::Error::new(io::ErrorKind::Other, "err"));
let msg = state.recv(&BytesCodec).await; let msg = state.recv(&BytesCodec).await;
assert!(msg.unwrap().is_err()); assert!(msg.is_err());
assert!(state.flags().contains(Flags::IO_ERR)); assert!(state.flags().contains(Flags::IO_ERR));
assert!(state.flags().contains(Flags::DSP_STOP)); assert!(state.flags().contains(Flags::DSP_STOP));
@ -332,7 +332,7 @@ mod tests {
client.read_error(io::Error::new(io::ErrorKind::Other, "err")); client.read_error(io::Error::new(io::ErrorKind::Other, "err"));
let res = poll_fn(|cx| Poll::Ready(state.poll_recv(&BytesCodec, cx))).await; let res = poll_fn(|cx| Poll::Ready(state.poll_recv(&BytesCodec, cx))).await;
if let Poll::Ready(msg) = res { if let Poll::Ready(msg) = res {
assert!(msg.unwrap().is_err()); assert!(msg.is_err());
assert!(state.flags().contains(Flags::IO_ERR)); assert!(state.flags().contains(Flags::IO_ERR));
assert!(state.flags().contains(Flags::DSP_STOP)); assert!(state.flags().contains(Flags::DSP_STOP));
} }
@ -341,14 +341,14 @@ mod tests {
client.remote_buffer_cap(1024); client.remote_buffer_cap(1024);
let state = Io::new(server); let state = Io::new(server);
state state
.send(Bytes::from_static(b"test"), &BytesCodec) .send(&BytesCodec, Bytes::from_static(b"test"))
.await .await
.unwrap(); .unwrap();
let buf = client.read().await.unwrap(); let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"test")); assert_eq!(buf, Bytes::from_static(b"test"));
client.write_error(io::Error::new(io::ErrorKind::Other, "err")); client.write_error(io::Error::new(io::ErrorKind::Other, "err"));
let res = state.send(Bytes::from_static(b"test"), &BytesCodec).await; let res = state.send(&BytesCodec, Bytes::from_static(b"test")).await;
assert!(res.is_err()); assert!(res.is_err());
assert!(state.flags().contains(Flags::IO_ERR)); assert!(state.flags().contains(Flags::IO_ERR));
assert!(state.flags().contains(Flags::DSP_STOP)); assert!(state.flags().contains(Flags::DSP_STOP));
@ -492,7 +492,7 @@ mod tests {
assert_eq!(msg, Bytes::from_static(BIN)); assert_eq!(msg, Bytes::from_static(BIN));
state state
.send(Bytes::from_static(b"test"), &BytesCodec) .send(&BytesCodec, Bytes::from_static(b"test"))
.await .await
.unwrap(); .unwrap();
let buf = client.read().await.unwrap(); let buf = client.read().await.unwrap();
@ -523,7 +523,7 @@ mod tests {
assert_eq!(msg, Bytes::from_static(BIN)); assert_eq!(msg, Bytes::from_static(BIN));
state state
.send(Bytes::from_static(b"test"), &BytesCodec) .send(&BytesCodec, Bytes::from_static(b"test"))
.await .await
.unwrap(); .unwrap();
let buf = client.read().await.unwrap(); let buf = client.read().await.unwrap();

View file

@ -2,7 +2,7 @@ use std::task::{Context, Poll};
use std::{any, cell::RefCell, cmp, future::Future, io, mem, pin::Pin, rc::Rc}; use std::{any, cell::RefCell, cmp, future::Future, io, mem, pin::Pin, rc::Rc};
use ntex_bytes::{Buf, BufMut, BytesMut}; use ntex_bytes::{Buf, BufMut, BytesMut};
use ntex_util::time::{sleep, Sleep}; use ntex_util::{ready, time::sleep, time::Sleep};
use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf}; use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf};
use tok_io::net::TcpStream; use tok_io::net::TcpStream;
@ -362,10 +362,10 @@ impl<F: Filter> AsyncRead for Io<F> {
}); });
if len == 0 { if len == 0 {
match self.poll_read_ready(cx) { match ready!(self.poll_read_ready(cx)) {
Poll::Pending | Poll::Ready(Some(Ok(()))) => Poll::Pending, Ok(Some(())) => Poll::Pending,
Poll::Ready(Some(Err(e))) => Poll::Ready(Err(e)), Ok(None) => Poll::Ready(Ok(())),
Poll::Ready(None) => Poll::Ready(Ok(())), Err(e) => Poll::Ready(Err(e)),
} }
} else { } else {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
@ -407,10 +407,10 @@ impl AsyncRead for IoBoxed {
}); });
if len == 0 { if len == 0 {
match self.poll_read_ready(cx) { match ready!(self.poll_read_ready(cx)) {
Poll::Pending | Poll::Ready(Some(Ok(()))) => Poll::Pending, Ok(Some(())) => Poll::Pending,
Poll::Ready(Some(Err(e))) => Poll::Ready(Err(e)), Err(e) => Poll::Ready(Err(e)),
Poll::Ready(None) => Poll::Ready(Ok(())), Ok(None) => Poll::Ready(Ok(())),
} }
} else { } else {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))

View file

@ -20,13 +20,14 @@ async fn main() -> io::Result<()> {
let io = connector.connect("127.0.0.1:8443").await.unwrap(); let io = connector.connect("127.0.0.1:8443").await.unwrap();
println!("Connected to ssl server"); println!("Connected to ssl server");
let result = io let result = io
.send(Bytes::from_static(b"hello"), &codec::BytesCodec) .send(&codec::BytesCodec, Bytes::from_static(b"hello"))
.await .await
.map_err(Either::into_inner)?; .map_err(Either::into_inner)?;
let resp = io let resp = io
.next(&codec::BytesCodec) .recv(&codec::BytesCodec)
.await .await
.map_err(|e| e.into_inner())?
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "disconnected"))?; .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "disconnected"))?;
println!("disconnecting"); println!("disconnecting");

View file

@ -41,25 +41,25 @@ async fn main() -> io::Result<()> {
println!("New client is connected"); println!("New client is connected");
io.send( io.send(
ntex_bytes::Bytes::from_static(b"Welcome!\n"),
&codec::BytesCodec, &codec::BytesCodec,
ntex_bytes::Bytes::from_static(b"Welcome!\n"),
) )
.await .await
.map_err(Either::into_inner)?; .map_err(Either::into_inner)?;
loop { loop {
match io.next(&codec::BytesCodec).await { match io.recv(&codec::BytesCodec).await {
Some(Ok(msg)) => { Ok(Some(msg)) => {
println!("Got message: {:?}", msg); println!("Got message: {:?}", msg);
io.send(msg.freeze(), &codec::BytesCodec) io.send(&codec::BytesCodec, msg.freeze())
.await .await
.map_err(Either::into_inner)?; .map_err(Either::into_inner)?;
} }
Some(Err(e)) => { Err(e) => {
println!("Got error: {:?}", e); println!("Got error: {:?}", e);
break; break;
} }
None => break, Ok(None) => break,
} }
} }
println!("Client is disconnected"); println!("Client is disconnected");

View file

@ -29,18 +29,18 @@ async fn main() -> io::Result<()> {
.and_then(fn_service(|io: Io<_>| async move { .and_then(fn_service(|io: Io<_>| async move {
println!("New client is connected"); println!("New client is connected");
loop { loop {
match io.next(&codec::BytesCodec).await { match io.recv(&codec::BytesCodec).await {
Some(Ok(msg)) => { Ok(Some(msg)) => {
println!("Got message: {:?}", msg); println!("Got message: {:?}", msg);
io.send(msg.freeze(), &codec::BytesCodec) io.send(&codec::BytesCodec, msg.freeze())
.await .await
.map_err(Either::into_inner)?; .map_err(Either::into_inner)?;
} }
Some(Err(e)) => { Err(e) => {
println!("Got error: {:?}", e); println!("Got error: {:?}", e);
break; break;
} }
None => break, Ok(None) => break,
} }
} }
println!("Client is disconnected"); println!("Client is disconnected");

View file

@ -341,10 +341,10 @@ fn handle_result<T, F>(
Err(e) => match e.code() { Err(e) => match e.code() {
ssl::ErrorCode::WANT_READ => { ssl::ErrorCode::WANT_READ => {
match ready!(io.poll_read_ready(cx)) { match ready!(io.poll_read_ready(cx)) {
None => Err::<_, Box<dyn Error>>( Ok(None) => Err::<_, Box<dyn Error>>(
io::Error::new(io::ErrorKind::Other, "disconnected").into(), io::Error::new(io::ErrorKind::Other, "disconnected").into(),
), ),
Some(Err(err)) => Err(err.into()), Err(err) => Err(err.into()),
_ => Ok(()), _ => Ok(()),
}?; }?;
Poll::Pending Poll::Pending

View file

@ -251,11 +251,11 @@ impl<F: Filter> TlsClientFilter<F> {
}; };
if result.is_ok() && wants_read { if result.is_ok() && wants_read {
poll_fn(|cx| match ready!(io.poll_read_ready(cx)) { poll_fn(|cx| match ready!(io.poll_read_ready(cx)) {
None => Poll::Ready(Err(io::Error::new( Ok(None) => Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
"disconnected", "disconnected",
))), ))),
Some(Err(e)) => Poll::Ready(Err(e)), Err(e) => Poll::Ready(Err(e)),
_ => Poll::Ready(Ok(())), _ => Poll::Ready(Ok(())),
}) })
.await?; .await?;
@ -265,13 +265,12 @@ impl<F: Filter> TlsClientFilter<F> {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
poll_fn(|cx| { poll_fn(|cx| {
let read_ready = if wants_read { let read_ready = if wants_read {
match ready!(io.poll_read_ready(cx)) { match ready!(io.poll_read_ready(cx))? {
Some(_) => Ok(true),
None => Err(io::Error::new( None => Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
"disconnected", "disconnected",
)), )),
Some(Err(e)) => Err(e),
Some(Ok(_)) => Ok(true),
}? }?
} else { } else {
true true

View file

@ -251,12 +251,11 @@ impl<F: Filter> TlsServerFilter<F> {
}; };
if result.is_ok() && wants_read { if result.is_ok() && wants_read {
poll_fn(|cx| { poll_fn(|cx| {
match ready!(io.poll_read_ready(cx)) { match ready!(io.poll_read_ready(cx))? {
Some(_) => Ok(()),
None => { None => {
Err(io::Error::new(io::ErrorKind::Other, "disconnected")) Err(io::Error::new(io::ErrorKind::Other, "disconnected"))
} }
Some(Err(e)) => Err(e),
_ => Ok(()),
}?; }?;
Poll::Ready(Ok::<_, io::Error>(())) Poll::Ready(Ok::<_, io::Error>(()))
}) })
@ -267,13 +266,12 @@ impl<F: Filter> TlsServerFilter<F> {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
poll_fn(|cx| { poll_fn(|cx| {
let read_ready = if wants_read { let read_ready = if wants_read {
match ready!(io.poll_read_ready(cx)) { match ready!(io.poll_read_ready(cx))? {
Some(_) => Ok(true),
None => Err(io::Error::new( None => Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
"disconnected", "disconnected",
)), )),
Some(Err(e)) => Err(e),
Some(Ok(_)) => Ok(true),
}? }?
} else { } else {
true true

View file

@ -59,7 +59,7 @@ where
// send request // send request
let codec = h1::ClientCodec::default(); let codec = h1::ClientCodec::default();
io.send((head, body.size()).into(), &codec).await?; io.send(&codec, (head, body.size()).into()).await?;
log::trace!("http1 request has been sent"); log::trace!("http1 request has been sent");
@ -74,8 +74,7 @@ where
log::trace!("reading http1 response"); log::trace!("reading http1 response");
// read response and init read body // read response and init read body
let head = if let Some(result) = io.recv(&codec).await { let head = if let Some(result) = io.recv(&codec).await? {
let result = result?;
log::trace!( log::trace!(
"http1 response is received, type: {:?}, response: {:#?}", "http1 response is received, type: {:?}, response: {:#?}",
codec.message_type(), codec.message_type(),
@ -105,11 +104,11 @@ pub(super) async fn open_tunnel(
) -> Result<(ResponseHead, IoBoxed, h1::ClientCodec), SendRequestError> { ) -> Result<(ResponseHead, IoBoxed, h1::ClientCodec), SendRequestError> {
// create Framed and send request // create Framed and send request
let codec = h1::ClientCodec::default(); let codec = h1::ClientCodec::default();
io.send((head, BodySize::None).into(), &codec).await?; io.send(&codec, (head, BodySize::None).into()).await?;
// read response // read response
if let Some(head) = io.recv(&codec).await { if let Some(head) = io.recv(&codec).await? {
Ok((head?, io, codec)) Ok((head, io, codec))
} else { } else {
Err(SendRequestError::from(ConnectError::Disconnected)) Err(SendRequestError::from(ConnectError::Disconnected))
} }

View file

@ -285,7 +285,7 @@ where
// decode incoming bytes stream // decode incoming bytes stream
match io.poll_recv(&this.inner.codec, cx) { match io.poll_recv(&this.inner.codec, cx) {
Poll::Ready(Some(Ok((mut req, pl)))) => { Poll::Ready(Ok(Some((mut req, pl)))) => {
log::trace!( log::trace!(
"http message is received: {:?} and payload {:?}", "http message is received: {:?} and payload {:?}",
req, req,
@ -350,7 +350,13 @@ where
); );
} }
} }
Poll::Ready(Some(Err(Either::Left(err)))) => { Poll::Ready(Ok(None)) => {
// peer is gone
log::trace!("peer is gone");
let e = DispatchError::Disconnect(None);
set_error!(this, e);
}
Poll::Ready(Err(Either::Left(err))) => {
// Malformed requests, respond with 400 // Malformed requests, respond with 400
log::trace!("malformed request: {:?}", err); log::trace!("malformed request: {:?}", err);
let (res, body) = let (res, body) =
@ -358,18 +364,12 @@ where
this.inner.error = Some(DispatchError::Parse(err)); this.inner.error = Some(DispatchError::Parse(err));
*this.st = this.inner.send_response(res, body.into_body()); *this.st = this.inner.send_response(res, body.into_body());
} }
Poll::Ready(Some(Err(Either::Right(err)))) => { Poll::Ready(Err(Either::Right(err))) => {
log::trace!("peer is gone with {:?}", err); log::trace!("peer is gone with {:?}", err);
// peer is gone // peer is gone
let e = DispatchError::Disconnect(Some(err)); let e = DispatchError::Disconnect(Some(err));
set_error!(this, e); set_error!(this, e);
} }
Poll::Ready(None) => {
log::trace!("peer is gone");
// peer is gone
let e = DispatchError::Disconnect(None);
set_error!(this, e);
}
Poll::Pending => { Poll::Pending => {
log::trace!("not enough data to decode http message"); log::trace!("not enough data to decode http message");
return Poll::Pending; return Poll::Pending;
@ -612,11 +612,11 @@ where
loop { loop {
let res = io.poll_recv(&payload.0, cx); let res = io.poll_recv(&payload.0, cx);
match res { match res {
Poll::Ready(Some(Ok(PayloadItem::Chunk(chunk)))) => { Poll::Ready(Ok(Some(PayloadItem::Chunk(chunk)))) => {
updated = true; updated = true;
payload.1.feed_data(chunk); payload.1.feed_data(chunk);
} }
Poll::Ready(Some(Ok(PayloadItem::Eof))) => { Poll::Ready(Ok(Some(PayloadItem::Eof))) => {
payload.1.feed_eof(); payload.1.feed_eof();
self.payload = None; self.payload = None;
if !updated { if !updated {
@ -624,12 +624,12 @@ where
} }
break; break;
} }
Poll::Ready(None) => { Poll::Ready(Ok(None)) => {
payload.1.set_error(PayloadError::EncodingCorrupted); payload.1.set_error(PayloadError::EncodingCorrupted);
self.payload = None; self.payload = None;
return Poll::Ready(Err(ParseError::Incomplete.into())); return Poll::Ready(Err(ParseError::Incomplete.into()));
} }
Poll::Ready(Some(Err(e))) => { Poll::Ready(Err(e)) => {
payload.1.set_error(PayloadError::EncodingCorrupted); payload.1.set_error(PayloadError::EncodingCorrupted);
self.payload = None; self.payload = None;
return Poll::Ready(Err(match e { return Poll::Ready(Err(match e {

View file

@ -12,7 +12,7 @@ use ntex::util::Bytes;
async fn test_string() { async fn test_string() {
let srv = test_server(|| { let srv = test_server(|| {
fn_service(|io: Io| async move { fn_service(|io: Io| async move {
io.send(Bytes::from_static(b"test"), &BytesCodec) io.send(&BytesCodec, Bytes::from_static(b"test"))
.await .await
.unwrap(); .unwrap();
Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
@ -30,7 +30,7 @@ async fn test_string() {
async fn test_rustls_string() { async fn test_rustls_string() {
let srv = test_server(|| { let srv = test_server(|| {
fn_service(|io: Io| async move { fn_service(|io: Io| async move {
io.send(Bytes::from_static(b"test"), &BytesCodec) io.send(&BytesCodec, Bytes::from_static(b"test"))
.await .await
.unwrap(); .unwrap();
Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
@ -47,7 +47,7 @@ async fn test_rustls_string() {
async fn test_static_str() { async fn test_static_str() {
let srv = test_server(|| { let srv = test_server(|| {
fn_service(|io: Io| async move { fn_service(|io: Io| async move {
io.send(Bytes::from_static(b"test"), &BytesCodec) io.send(&BytesCodec, Bytes::from_static(b"test"))
.await .await
.unwrap(); .unwrap();
Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
@ -69,7 +69,7 @@ async fn test_static_str() {
async fn test_new_service() { async fn test_new_service() {
let srv = test_server(|| { let srv = test_server(|| {
fn_service(|io: Io| async move { fn_service(|io: Io| async move {
io.send(Bytes::from_static(b"test"), &BytesCodec) io.send(&BytesCodec, Bytes::from_static(b"test"))
.await .await
.unwrap(); .unwrap();
Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
@ -89,7 +89,7 @@ async fn test_uri() {
let srv = test_server(|| { let srv = test_server(|| {
fn_service(|io: Io| async move { fn_service(|io: Io| async move {
io.send(Bytes::from_static(b"test"), &BytesCodec) io.send(&BytesCodec, Bytes::from_static(b"test"))
.await .await
.unwrap(); .unwrap();
Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
@ -111,7 +111,7 @@ async fn test_rustls_uri() {
let srv = test_server(|| { let srv = test_server(|| {
fn_service(|io: Io| async move { fn_service(|io: Io| async move {
io.send(Bytes::from_static(b"test"), &BytesCodec) io.send(&BytesCodec, Bytes::from_static(b"test"))
.await .await
.unwrap(); .unwrap();
Ok::<_, io::Error>(()) Ok::<_, io::Error>(())

View file

@ -54,27 +54,27 @@ async fn test_simple() {
// client service // client service
let (_, io, codec) = srv.ws().await.unwrap().into_inner(); let (_, io, codec) = srv.ws().await.unwrap().into_inner();
io.send(ws::Message::Text(ByteString::from_static("text")), &codec) io.send(&codec, ws::Message::Text(ByteString::from_static("text")))
.await .await
.unwrap(); .unwrap();
let item = io.recv(&codec).await.unwrap().unwrap(); let item = io.recv(&codec).await.unwrap().unwrap();
assert_eq!(item, ws::Frame::Text(Bytes::from_static(b"text"))); assert_eq!(item, ws::Frame::Text(Bytes::from_static(b"text")));
io.send(ws::Message::Binary("text".into()), &codec) io.send(&codec, ws::Message::Binary("text".into()))
.await .await
.unwrap(); .unwrap();
let item = io.recv(&codec).await.unwrap().unwrap(); let item = io.recv(&codec).await.unwrap().unwrap();
assert_eq!(item, ws::Frame::Binary(Bytes::from_static(b"text"))); assert_eq!(item, ws::Frame::Binary(Bytes::from_static(b"text")));
io.send(ws::Message::Ping("text".into()), &codec) io.send(&codec, ws::Message::Ping("text".into()))
.await .await
.unwrap(); .unwrap();
let item = io.recv(&codec).await.unwrap().unwrap(); let item = io.recv(&codec).await.unwrap().unwrap();
assert_eq!(item, ws::Frame::Pong("text".to_string().into())); assert_eq!(item, ws::Frame::Pong("text".to_string().into()));
io.send( io.send(
ws::Message::Close(Some(ws::CloseCode::Normal.into())),
&codec, &codec,
ws::Message::Close(Some(ws::CloseCode::Normal.into())),
) )
.await .await
.unwrap(); .unwrap();

View file

@ -97,7 +97,7 @@ async fn test_simple() {
assert_eq!(conn.response().status(), StatusCode::SWITCHING_PROTOCOLS); assert_eq!(conn.response().status(), StatusCode::SWITCHING_PROTOCOLS);
let (_, io, codec) = conn.into_inner(); let (_, io, codec) = conn.into_inner();
io.send(ws::Message::Text(ByteString::from_static("text")), &codec) io.send(&codec, ws::Message::Text(ByteString::from_static("text")))
.await .await
.unwrap(); .unwrap();
let item = io.recv(&codec).await; let item = io.recv(&codec).await;
@ -106,7 +106,7 @@ async fn test_simple() {
ws::Frame::Text(Bytes::from_static(b"text")) ws::Frame::Text(Bytes::from_static(b"text"))
); );
io.send(ws::Message::Binary("text".into()), &codec) io.send(&codec, ws::Message::Binary("text".into()))
.await .await
.unwrap(); .unwrap();
let item = io.recv(&codec).await; let item = io.recv(&codec).await;
@ -115,7 +115,7 @@ async fn test_simple() {
ws::Frame::Binary(Bytes::from_static(&b"text"[..])) ws::Frame::Binary(Bytes::from_static(&b"text"[..]))
); );
io.send(ws::Message::Ping("text".into()), &codec) io.send(&codec, ws::Message::Ping("text".into()))
.await .await
.unwrap(); .unwrap();
let item = io.recv(&codec).await; let item = io.recv(&codec).await;
@ -125,8 +125,8 @@ async fn test_simple() {
); );
io.send( io.send(
ws::Message::Continuation(ws::Item::FirstText("text".into())),
&codec, &codec,
ws::Message::Continuation(ws::Item::FirstText("text".into())),
) )
.await .await
.unwrap(); .unwrap();
@ -138,22 +138,22 @@ async fn test_simple() {
assert!(io assert!(io
.send( .send(
&codec,
ws::Message::Continuation(ws::Item::FirstText("text".into())), ws::Message::Continuation(ws::Item::FirstText("text".into())),
&codec
) )
.await .await
.is_err()); .is_err());
assert!(io assert!(io
.send( .send(
&codec,
ws::Message::Continuation(ws::Item::FirstBinary("text".into())), ws::Message::Continuation(ws::Item::FirstBinary("text".into())),
&codec
) )
.await .await
.is_err()); .is_err());
io.send( io.send(
ws::Message::Continuation(ws::Item::Continue("text".into())),
&codec, &codec,
ws::Message::Continuation(ws::Item::Continue("text".into())),
) )
.await .await
.unwrap(); .unwrap();
@ -164,8 +164,8 @@ async fn test_simple() {
); );
io.send( io.send(
ws::Message::Continuation(ws::Item::Last("text".into())),
&codec, &codec,
ws::Message::Continuation(ws::Item::Last("text".into())),
) )
.await .await
.unwrap(); .unwrap();
@ -177,23 +177,23 @@ async fn test_simple() {
assert!(io assert!(io
.send( .send(
&codec,
ws::Message::Continuation(ws::Item::Continue("text".into())), ws::Message::Continuation(ws::Item::Continue("text".into())),
&codec
) )
.await .await
.is_err()); .is_err());
assert!(io assert!(io
.send( .send(
&codec,
ws::Message::Continuation(ws::Item::Last("text".into())), ws::Message::Continuation(ws::Item::Last("text".into())),
&codec
) )
.await .await
.is_err()); .is_err());
io.send( io.send(
ws::Message::Continuation(ws::Item::FirstBinary(Bytes::from_static(b"bin"))),
&codec, &codec,
ws::Message::Continuation(ws::Item::FirstBinary(Bytes::from_static(b"bin"))),
) )
.await .await
.unwrap(); .unwrap();
@ -204,8 +204,8 @@ async fn test_simple() {
); );
io.send( io.send(
ws::Message::Continuation(ws::Item::Continue("text".into())),
&codec, &codec,
ws::Message::Continuation(ws::Item::Continue("text".into())),
) )
.await .await
.unwrap(); .unwrap();
@ -216,8 +216,8 @@ async fn test_simple() {
); );
io.send( io.send(
ws::Message::Continuation(ws::Item::Last("text".into())),
&codec, &codec,
ws::Message::Continuation(ws::Item::Last("text".into())),
) )
.await .await
.unwrap(); .unwrap();
@ -228,8 +228,8 @@ async fn test_simple() {
); );
io.send( io.send(
ws::Message::Close(Some(ws::CloseCode::Normal.into())),
&codec, &codec,
ws::Message::Close(Some(ws::CloseCode::Normal.into())),
) )
.await .await
.unwrap(); .unwrap();

View file

@ -77,7 +77,7 @@ fn test_run() {
.disable_signals() .disable_signals()
.bind("test", addr, move |_| { .bind("test", addr, move |_| {
fn_service(|io: Io| async move { fn_service(|io: Io| async move {
io.send(Bytes::from_static(b"test"), &BytesCodec) io.send(&BytesCodec, Bytes::from_static(b"test"))
.await .await
.unwrap(); .unwrap();
Ok::<_, ()>(()) Ok::<_, ()>(())

View file

@ -38,27 +38,27 @@ async fn web_ws() {
// client service // client service
let (_, io, codec) = srv.ws().await.unwrap().into_inner(); let (_, io, codec) = srv.ws().await.unwrap().into_inner();
io.send(ws::Message::Text(ByteString::from_static("text")), &codec) io.send(&codec, ws::Message::Text(ByteString::from_static("text")))
.await .await
.unwrap(); .unwrap();
let item = io.recv(&codec).await.unwrap().unwrap(); let item = io.recv(&codec).await.unwrap().unwrap();
assert_eq!(item, ws::Frame::Text(Bytes::from_static(b"text"))); assert_eq!(item, ws::Frame::Text(Bytes::from_static(b"text")));
io.send(ws::Message::Binary("text".into()), &codec) io.send(&codec, ws::Message::Binary("text".into()))
.await .await
.unwrap(); .unwrap();
let item = io.recv(&codec).await.unwrap().unwrap(); let item = io.recv(&codec).await.unwrap().unwrap();
assert_eq!(item, ws::Frame::Binary(Bytes::from_static(b"text"))); assert_eq!(item, ws::Frame::Binary(Bytes::from_static(b"text")));
io.send(ws::Message::Ping("text".into()), &codec) io.send(&codec, ws::Message::Ping("text".into()))
.await .await
.unwrap(); .unwrap();
let item = io.recv(&codec).await.unwrap().unwrap(); let item = io.recv(&codec).await.unwrap().unwrap();
assert_eq!(item, ws::Frame::Pong("text".to_string().into())); assert_eq!(item, ws::Frame::Pong("text".to_string().into()));
io.send( io.send(
ws::Message::Close(Some(ws::CloseCode::Normal.into())),
&codec, &codec,
ws::Message::Close(Some(ws::CloseCode::Normal.into())),
) )
.await .await
.unwrap(); .unwrap();