Add types::Payload::recv() and types::Payload::poll_recv() methods

This commit is contained in:
Nikolay Kim 2022-01-19 16:45:10 +06:00
parent 14a1cd0b12
commit 05f3231180
6 changed files with 34 additions and 11 deletions

View file

@ -46,6 +46,7 @@ where
}
#[doc(hidden)]
#[deprecated(since = "0.1.4", note = "Use stream_recv() fn instead")]
pub async fn next<S>(stream: &mut S) -> Option<S::Item>
where
S: Stream + Unpin,

View file

@ -1,5 +1,9 @@
# Changes
## [0.5.11] - 2022-01-xx
* web: Add types::Payload::recv() and types::Payload::poll_recv() methods
## [0.5.10] - 2022-01-17
* rt: Add glommio runtime support

View file

@ -19,7 +19,7 @@ use crate::service::{
map_config, IntoService, IntoServiceFactory, Service, ServiceFactory,
};
use crate::time::{sleep, Millis, Seconds};
use crate::util::{next, Bytes, BytesMut, Extensions, Ready, Stream};
use crate::util::{stream_recv, Bytes, BytesMut, Extensions, Ready, Stream};
use crate::ws::{error::WsClientError, WsClient, WsConnection};
use crate::{io::Sealed, rt::System, server::Server};
@ -149,7 +149,7 @@ where
let mut body = resp.take_body();
let mut bytes = BytesMut::new();
while let Some(item) = next(&mut body).await {
while let Some(item) = stream_recv(&mut body).await {
bytes.extend_from_slice(&item.unwrap());
}
bytes.freeze()
@ -184,7 +184,7 @@ where
pub async fn read_body(mut res: WebResponse) -> Bytes {
let mut body = res.take_body();
let mut bytes = BytesMut::new();
while let Some(item) = next(&mut body).await {
while let Some(item) = stream_recv(&mut body).await {
bytes.extend_from_slice(&item.unwrap());
}
bytes.freeze()
@ -196,7 +196,7 @@ where
S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin,
{
let mut data = BytesMut::new();
while let Some(item) = next(&mut stream).await {
while let Some(item) = stream_recv(&mut stream).await {
data.extend_from_slice(&item?);
}
Ok(data.freeze())

View file

@ -8,7 +8,7 @@ use serde::{de::DeserializeOwned, Serialize};
use crate::http::encoding::Decoder;
use crate::http::header::{CONTENT_LENGTH, CONTENT_TYPE};
use crate::http::{HttpMessage, Payload, Response, StatusCode};
use crate::util::{next, BytesMut};
use crate::util::{stream_recv, BytesMut};
use crate::web::error::{ErrorRenderer, UrlencodedError, WebResponseError};
use crate::web::responder::{Ready, Responder};
use crate::web::{FromRequest, HttpRequest};
@ -309,7 +309,7 @@ where
self.fut = Some(Box::pin(async move {
let mut body = BytesMut::with_capacity(8192);
while let Some(item) = next(&mut stream).await {
while let Some(item) = stream_recv(&mut stream).await {
let chunk = item?;
if (body.len() + chunk.len()) > limit {
return Err(UrlencodedError::Overflow {

View file

@ -7,7 +7,7 @@ use serde::{de::DeserializeOwned, Serialize};
use crate::http::encoding::Decoder;
use crate::http::header::CONTENT_LENGTH;
use crate::http::{HttpMessage, Payload, Response, StatusCode};
use crate::util::{next, BytesMut};
use crate::util::{stream_recv, BytesMut};
use crate::web::error::{ErrorRenderer, JsonError, JsonPayloadError, WebResponseError};
use crate::web::responder::{Ready, Responder};
use crate::web::{FromRequest, HttpRequest};
@ -354,7 +354,7 @@ where
self.fut = Some(Box::pin(async move {
let mut body = BytesMut::with_capacity(8192);
while let Some(item) = next(&mut stream).await {
while let Some(item) = stream_recv(&mut stream).await {
let chunk = item?;
if (body.len() + chunk.len()) > limit {
return Err(JsonPayloadError::Overflow);

View file

@ -5,7 +5,7 @@ use encoding_rs::UTF_8;
use mime::Mime;
use crate::http::{error, header, HttpMessage};
use crate::util::{next, Bytes, BytesMut, Either, Ready, Stream};
use crate::util::{stream_recv, Bytes, BytesMut, Either, Ready, Stream};
use crate::web::error::{ErrorRenderer, PayloadError};
use crate::web::{FromRequest, HttpRequest};
@ -41,10 +41,28 @@ use crate::web::{FromRequest, HttpRequest};
pub struct Payload(pub crate::http::Payload);
impl Payload {
#[inline]
/// Deconstruct to a inner value
pub fn into_inner(self) -> crate::http::Payload {
self.0
}
#[inline]
/// Attempt to pull out the next value of this payload.
pub async fn recv(&mut self) -> Option<Result<Bytes, error::PayloadError>> {
self.0.recv().await
}
#[inline]
/// Attempt to pull out the next value of this payload, registering
/// the current task for wakeup if the value is not yet available,
/// and returning None if the payload is exhausted.
pub fn poll_recv(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, error::PayloadError>>> {
self.0.poll_recv(cx)
}
}
impl Stream for Payload {
@ -55,7 +73,7 @@ impl Stream for Payload {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_next(cx)
self.poll_recv(cx)
}
}
@ -378,7 +396,7 @@ impl Future for HttpMessageBody {
self.fut = Some(Box::pin(async move {
let mut body = BytesMut::with_capacity(8192);
while let Some(item) = next(&mut stream).await {
while let Some(item) = stream_recv(&mut stream).await {
let chunk = item?;
if body.len() + chunk.len() > limit {
return Err(PayloadError::from(error::PayloadError::Overflow));