add ntex-multipart (#1)

Co-authored-by: krircc <krircc@aliyun.com>
This commit is contained in:
krircc 2020-04-05 12:28:02 +08:00 committed by GitHub
parent c976ad1c3a
commit 69809a7048
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 1221 additions and 2 deletions

79
.github/workflows/linux.yml vendored Normal file
View file

@ -0,0 +1,79 @@
name: CI (Linux)
on: [push, pull_request]
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
version:
- 1.42.0 # MSRV
- stable
- nightly
name: ${{ matrix.version }} - x86_64-unknown-linux-gnu
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-x86_64-unknown-linux-gnu
profile: minimal
override: true
- name: Generate Cargo.lock
uses: actions-rs/cargo@v1
with:
command: generate-lockfile
- name: Cache cargo registry
uses: actions/cache@v1
with:
path: ~/.cargo/registry
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-registry-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: Cache cargo index
uses: actions/cache@v1
with:
path: ~/.cargo/git
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-index-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: Cache cargo build
uses: actions/cache@v1
with:
path: target
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: check build
uses: actions-rs/cargo@v1
with:
command: check
args: --all --all-features --bins --examples --tests
- name: tests
uses: actions-rs/cargo@v1
timeout-minutes: 40
with:
command: test
args: --all --all-features --no-fail-fast -- --nocapture
- name: Generate coverage file
if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
run: |
cargo install cargo-tarpaulin
cargo tarpaulin --out Xml --all --all-features
- name: Upload to Codecov
if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
uses: codecov/codecov-action@v1
with:
file: cobertura.xml
- name: Clear the cargo caches
run: |
cargo install cargo-cache --no-default-features --features ci-autoclean
cargo-cache

65
.github/workflows/osx.yml vendored Normal file
View file

@ -0,0 +1,65 @@
name: CI (OSX)
on: [push, pull_request]
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
version:
- stable
- nightly
name: ${{ matrix.version }} - x86_64-apple-darwin
runs-on: macOS-latest
steps:
- uses: actions/checkout@master
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-x86_64-apple-darwin
profile: minimal
override: true
- name: Generate Cargo.lock
uses: actions-rs/cargo@v1
with:
command: generate-lockfile
- name: Cache cargo registry
uses: actions/cache@v1
with:
path: ~/.cargo/registry
key: ${{ matrix.version }}-x86_64-apple-darwin-cargo-registry-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: Cache cargo index
uses: actions/cache@v1
with:
path: ~/.cargo/git
key: ${{ matrix.version }}-x86_64-apple-darwin-cargo-index-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: Cache cargo build
uses: actions/cache@v1
with:
path: target
key: ${{ matrix.version }}-x86_64-apple-darwin-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: check build
uses: actions-rs/cargo@v1
with:
command: check
args: --all --all-features --bins --examples --tests
- name: tests
uses: actions-rs/cargo@v1
with:
command: test
args: --all --all-features --no-fail-fast -- --nocapture
- name: Clear the cargo caches
run: |
cargo install cargo-cache --no-default-features --features ci-autoclean
cargo-cache

49
.github/workflows/windows.yml vendored Normal file
View file

@ -0,0 +1,49 @@
name: CI (Windows)
on: [push, pull_request]
env:
VCPKGRS_DYNAMIC: 1
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
version:
- stable
- nightly
name: ${{ matrix.version }} - x86_64-pc-windows-msvc
runs-on: windows-latest
steps:
- uses: actions/checkout@master
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-x86_64-pc-windows-msvc
profile: minimal
override: true
- name: Install OpenSSL
run: |
vcpkg integrate install
vcpkg install openssl:x64-windows
Copy-Item C:\vcpkg\installed\x64-windows\bin\libcrypto-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libcrypto.dll
Copy-Item C:\vcpkg\installed\x64-windows\bin\libssl-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libssl.dll
Get-ChildItem C:\vcpkg\installed\x64-windows\bin
Get-ChildItem C:\vcpkg\installed\x64-windows\lib
- name: check build
uses: actions-rs/cargo@v1
with:
command: check
args: --all --all-features --bins --examples --tests
- name: tests
uses: actions-rs/cargo@v1
with:
command: test
args: --all --all-features --no-fail-fast -- --nocapture

13
.gitignore vendored Normal file
View file

@ -0,0 +1,13 @@
/target
Cargo.lock
guide/build/
/gh-pages
*.so
*.out
*.pid
*.sock
*~
# These are backup files generated by rustfmt
**/*.rs.bk

4
Cargo.toml Normal file
View file

@ -0,0 +1,4 @@
[workspace]
members = [
"ntex-multipart",
]

25
LICENSE Normal file
View file

@ -0,0 +1,25 @@
Copyright (c) 2020 Nikolay Kim, krircc
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View file

@ -1,2 +1,26 @@
ntex-extras
===========
<div align="center">
<p><h1>ntex-extras</h1> </p>
<p><strong>A collection of additional crates supporting the ntex frameworks.</strong> </p>
<p>
[![Build Status](https://github.com/ntex-rs/ntex-extras/workflows/CI%20(Linux)/badge.svg)](https://travis-ci.org/ntex-rs/ntex)
[![Version](https://img.shields.io/badge/rustc-1.42+-lightgray.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.42.html)
![License](https://img.shields.io/crates/l/ntex-extras.svg)
</p>
</div>
## Crates
| Crate | | |
| -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------- |
| [ntex-multipart] | [![crates.io](https://img.shields.io/crates/v/ntex-multipart)](https://crates.io/crates/ntex-multipart) [![Documentation](https://docs.rs/ntex-multipart/badge.svg)](https://docs.rs/ntex-multipart) | Multipart support for ntex applications. |
<!-- REFERENCES -->
[ntex-multipart]: ntex-multipart
## License
This project is licensed under
* MIT license ([LICENSE-MIT](LICENSE-MIT) or [http://opensource.org/licenses/MIT](http://opensource.org/licenses/MIT))

View file

@ -0,0 +1,5 @@
# Changes
## [0.1.0] - 2020-04-05
* Fork to ntex namespace

25
ntex-multipart/Cargo.toml Normal file
View file

@ -0,0 +1,25 @@
[package]
name = "ntex-multipart"
version = "0.1.0"
authors = ["krircc <krircc@qq.com>"]
description = "Multipart support for ntex framework."
readme = "README.md"
keywords = ["http", "web", "framework", "async", "futures"]
homepage = "https://ntex.rs"
repository = "https://github.com/ntex/ntex-extras.git"
documentation = "https://docs.rs/ntex-multipart/"
license = "MIT"
edition = "2018"
[dependencies]
ntex = "0.1.1"
bytes = "0.5.3"
derive_more = "0.99.2"
httparse = "1.3"
futures = "0.3.1"
log = "0.4"
mime = "0.3"
twoway = "0.2"
[dev-dependencies]
ntex = "0.1.1"

25
ntex-multipart/LICENSE Normal file
View file

@ -0,0 +1,25 @@
Copyright (c) 2020 krircc
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

7
ntex-multipart/README.md Normal file
View file

@ -0,0 +1,7 @@
# Multipart support for ntex framework
## Documentation & community resources
* [API Documentation](https://docs.rs/ntex-multipart/)
* Cargo package: [actix-multipart](https://crates.io/crates/ntex-multipart)
* Minimum supported Rust version: 1.42 or later

View file

@ -0,0 +1,55 @@
//! Error and Result module
use derive_more::{Display, From};
use ntex::http::error::{ParseError, PayloadError};
use ntex::http::StatusCode;
use ntex::web::{DefaultError, WebResponseError};
/// A set of errors that can occur during parsing multipart streams
#[derive(Debug, Display, From)]
pub enum MultipartError {
/// Content-Type header is not found
#[display(fmt = "No Content-type header found")]
NoContentType,
/// Can not parse Content-Type header
#[display(fmt = "Can not parse Content-Type header")]
ParseContentType,
/// Multipart boundary is not found
#[display(fmt = "Multipart boundary is not found")]
Boundary,
/// Nested multipart is not supported
#[display(fmt = "Nested multipart is not supported")]
Nested,
/// Multipart stream is incomplete
#[display(fmt = "Multipart stream is incomplete")]
Incomplete,
/// Error during field parsing
#[display(fmt = "{}", _0)]
Parse(ParseError),
/// Payload error
#[display(fmt = "{}", _0)]
Payload(PayloadError),
/// Not consumed
#[display(fmt = "Multipart stream is not consumed")]
NotConsumed,
}
impl std::error::Error for MultipartError {}
/// Return `BadRequest` for `MultipartError`
impl WebResponseError<DefaultError> for MultipartError {
fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
}
#[cfg(test)]
mod tests {
use super::*;
use ntex::web::HttpResponse;
#[test]
fn test_multipart_error() {
let resp: HttpResponse = MultipartError::Boundary.error_response();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
}

View file

@ -0,0 +1,40 @@
use futures::future::{ok, Ready};
use ntex::http::Payload;
use ntex::web::{ErrorRenderer, FromRequest, HttpRequest};
use crate::server::Multipart;
/// Get request's payload as multipart stream
///
/// Content-type: multipart/form-data;
///
/// ## Server example
///
/// ```rust
/// use futures::{Stream, StreamExt};
/// use ntex::web::{HttpResponse, Error};
/// use ntex_multipart as mp;
///
/// async fn index(mut payload: mp::Multipart) -> Result<HttpResponse, Error> {
/// // iterate over multipart stream
/// while let Some(item) = payload.next().await {
/// let mut field = item?;
///
/// // Field in turn is stream of *Bytes* object
/// while let Some(chunk) = field.next().await {
/// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk?));
/// }
/// }
/// Ok(HttpResponse::Ok().into())
/// }
/// # fn main() {}
/// ```
impl<Err: ErrorRenderer> FromRequest<Err> for Multipart {
type Error = Err::Container;
type Future = Ready<Result<Self, Self::Error>>;
#[inline]
fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
ok(Multipart::new(req.headers(), payload.take()))
}
}

View file

@ -0,0 +1,8 @@
#![allow(clippy::borrow_interior_mutable_const)]
mod error;
mod extractor;
mod server;
pub use self::error::MultipartError;
pub use self::server::{Field, Multipart};

View file

@ -0,0 +1,793 @@
//! Multipart payload support
use std::cell::{Cell, RefCell, RefMut};
use std::convert::TryFrom;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::{cmp, fmt};
use bytes::{Bytes, BytesMut};
use futures::stream::{LocalBoxStream, Stream, StreamExt};
use httparse;
use mime;
use ntex::http::error::{ParseError, PayloadError};
use ntex::http::header::{self, HeaderMap, HeaderName, HeaderValue};
use ntex::task::LocalWaker;
use crate::error::MultipartError;
const MAX_HEADERS: usize = 32;
/// The server-side implementation of `multipart/form-data` requests.
///
/// This will parse the incoming stream into `MultipartItem` instances via its
/// Stream implementation.
/// `MultipartItem::Field` contains multipart field. `MultipartItem::Multipart`
/// is used for nested multipart streams.
pub struct Multipart {
safety: Safety,
error: Option<MultipartError>,
inner: Option<Rc<RefCell<InnerMultipart>>>,
}
enum InnerMultipartItem {
None,
Field(Rc<RefCell<InnerField>>),
}
#[derive(PartialEq, Debug)]
enum InnerState {
/// Stream eof
Eof,
/// Skip data until first boundary
FirstBoundary,
/// Reading boundary
Boundary,
/// Reading Headers,
Headers,
}
struct InnerMultipart {
payload: PayloadRef,
boundary: String,
state: InnerState,
item: InnerMultipartItem,
}
impl Multipart {
/// Create multipart instance for boundary.
pub fn new<S>(headers: &HeaderMap, stream: S) -> Multipart
where
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static,
{
match Self::boundary(headers) {
Ok(boundary) => Multipart {
error: None,
safety: Safety::new(),
inner: Some(Rc::new(RefCell::new(InnerMultipart {
boundary,
payload: PayloadRef::new(PayloadBuffer::new(Box::new(stream))),
state: InnerState::FirstBoundary,
item: InnerMultipartItem::None,
}))),
},
Err(err) => Multipart {
error: Some(err),
safety: Safety::new(),
inner: None,
},
}
}
/// Extract boundary info from headers.
fn boundary(headers: &HeaderMap) -> Result<String, MultipartError> {
if let Some(content_type) = headers.get(&header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
if let Ok(ct) = content_type.parse::<mime::Mime>() {
if let Some(boundary) = ct.get_param(mime::BOUNDARY) {
Ok(boundary.as_str().to_owned())
} else {
Err(MultipartError::Boundary)
}
} else {
Err(MultipartError::ParseContentType)
}
} else {
Err(MultipartError::ParseContentType)
}
} else {
Err(MultipartError::NoContentType)
}
}
}
impl Stream for Multipart {
type Item = Result<Field, MultipartError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
if let Some(err) = self.error.take() {
Poll::Ready(Some(Err(err)))
} else if self.safety.current() {
let this = self.get_mut();
let mut inner = this.inner.as_mut().unwrap().borrow_mut();
if let Some(mut payload) = inner.payload.get_mut(&this.safety) {
payload.poll_stream(cx)?;
}
inner.poll(&this.safety, cx)
} else if !self.safety.is_clean() {
Poll::Ready(Some(Err(MultipartError::NotConsumed)))
} else {
Poll::Pending
}
}
}
impl InnerMultipart {
fn read_headers(
payload: &mut PayloadBuffer,
) -> Result<Option<HeaderMap>, MultipartError> {
match payload.read_until(b"\r\n\r\n")? {
None => {
if payload.eof {
Err(MultipartError::Incomplete)
} else {
Ok(None)
}
}
Some(bytes) => {
let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS];
match httparse::parse_headers(&bytes, &mut hdrs) {
Ok(httparse::Status::Complete((_, hdrs))) => {
// convert headers
let mut headers = HeaderMap::with_capacity(hdrs.len());
for h in hdrs {
if let Ok(name) = HeaderName::try_from(h.name) {
if let Ok(value) = HeaderValue::try_from(h.value) {
headers.append(name, value);
} else {
return Err(ParseError::Header.into());
}
} else {
return Err(ParseError::Header.into());
}
}
Ok(Some(headers))
}
Ok(httparse::Status::Partial) => Err(ParseError::Header.into()),
Err(err) => Err(ParseError::from(err).into()),
}
}
}
}
fn read_boundary(
payload: &mut PayloadBuffer,
boundary: &str,
) -> Result<Option<bool>, MultipartError> {
// TODO: need to read epilogue
match payload.readline_or_eof()? {
None => {
if payload.eof {
Ok(Some(true))
} else {
Ok(None)
}
}
Some(chunk) => {
if chunk.len() < boundary.len() + 4
|| &chunk[..2] != b"--"
|| &chunk[2..boundary.len() + 2] != boundary.as_bytes()
{
Err(MultipartError::Boundary)
} else if &chunk[boundary.len() + 2..] == b"\r\n" {
Ok(Some(false))
} else if &chunk[boundary.len() + 2..boundary.len() + 4] == b"--"
&& (chunk.len() == boundary.len() + 4
|| &chunk[boundary.len() + 4..] == b"\r\n")
{
Ok(Some(true))
} else {
Err(MultipartError::Boundary)
}
}
}
}
fn skip_until_boundary(
payload: &mut PayloadBuffer,
boundary: &str,
) -> Result<Option<bool>, MultipartError> {
let mut eof = false;
loop {
match payload.readline()? {
Some(chunk) => {
if chunk.is_empty() {
return Err(MultipartError::Boundary);
}
if chunk.len() < boundary.len() {
continue;
}
if &chunk[..2] == b"--"
&& &chunk[2..chunk.len() - 2] == boundary.as_bytes()
{
break;
} else {
if chunk.len() < boundary.len() + 2 {
continue;
}
let b: &[u8] = boundary.as_ref();
if &chunk[..boundary.len()] == b
&& &chunk[boundary.len()..boundary.len() + 2] == b"--"
{
eof = true;
break;
}
}
}
None => {
return if payload.eof {
Err(MultipartError::Incomplete)
} else {
Ok(None)
};
}
}
}
Ok(Some(eof))
}
fn poll(
&mut self,
safety: &Safety,
cx: &mut Context,
) -> Poll<Option<Result<Field, MultipartError>>> {
if self.state == InnerState::Eof {
Poll::Ready(None)
} else {
// release field
loop {
// Nested multipart streams of fields has to be consumed
// before switching to next
if safety.current() {
let stop = match self.item {
InnerMultipartItem::Field(ref mut field) => {
match field.borrow_mut().poll(safety) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Ok(_))) => continue,
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)))
}
Poll::Ready(None) => true,
}
}
InnerMultipartItem::None => false,
};
if stop {
self.item = InnerMultipartItem::None;
}
if let InnerMultipartItem::None = self.item {
break;
}
}
}
let headers = if let Some(mut payload) = self.payload.get_mut(safety) {
match self.state {
// read until first boundary
InnerState::FirstBoundary => {
match InnerMultipart::skip_until_boundary(
&mut *payload,
&self.boundary,
)? {
Some(eof) => {
if eof {
self.state = InnerState::Eof;
return Poll::Ready(None);
} else {
self.state = InnerState::Headers;
}
}
None => return Poll::Pending,
}
}
// read boundary
InnerState::Boundary => {
match InnerMultipart::read_boundary(
&mut *payload,
&self.boundary,
)? {
None => return Poll::Pending,
Some(eof) => {
if eof {
self.state = InnerState::Eof;
return Poll::Ready(None);
} else {
self.state = InnerState::Headers;
}
}
}
}
_ => (),
}
// read field headers for next field
if self.state == InnerState::Headers {
if let Some(headers) = InnerMultipart::read_headers(&mut *payload)? {
self.state = InnerState::Boundary;
headers
} else {
return Poll::Pending;
}
} else {
unreachable!()
}
} else {
log::debug!("NotReady: field is in flight");
return Poll::Pending;
};
// content type
let mut mt = mime::APPLICATION_OCTET_STREAM;
if let Some(content_type) = headers.get(&header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
if let Ok(ct) = content_type.parse::<mime::Mime>() {
mt = ct;
}
}
}
self.state = InnerState::Boundary;
// nested multipart stream
if mt.type_() == mime::MULTIPART {
Poll::Ready(Some(Err(MultipartError::Nested)))
} else {
let field = Rc::new(RefCell::new(InnerField::new(
self.payload.clone(),
self.boundary.clone(),
&headers,
)?));
self.item = InnerMultipartItem::Field(Rc::clone(&field));
Poll::Ready(Some(Ok(Field::new(safety.clone(cx), headers, mt, field))))
}
}
}
}
impl Drop for InnerMultipart {
fn drop(&mut self) {
// InnerMultipartItem::Field has to be dropped first because of Safety.
self.item = InnerMultipartItem::None;
}
}
/// A single field in a multipart stream
pub struct Field {
ct: mime::Mime,
headers: HeaderMap,
inner: Rc<RefCell<InnerField>>,
safety: Safety,
}
impl Field {
fn new(
safety: Safety,
headers: HeaderMap,
ct: mime::Mime,
inner: Rc<RefCell<InnerField>>,
) -> Self {
Field {
ct,
headers,
inner,
safety,
}
}
/// Get a map of headers
pub fn headers(&self) -> &HeaderMap {
&self.headers
}
/// Get the content type of the field
pub fn content_type(&self) -> &mime::Mime {
&self.ct
}
}
impl Stream for Field {
type Item = Result<Bytes, MultipartError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.safety.current() {
let mut inner = self.inner.borrow_mut();
if let Some(mut payload) =
inner.payload.as_ref().unwrap().get_mut(&self.safety)
{
payload.poll_stream(cx)?;
}
inner.poll(&self.safety)
} else if !self.safety.is_clean() {
Poll::Ready(Some(Err(MultipartError::NotConsumed)))
} else {
Poll::Pending
}
}
}
impl fmt::Debug for Field {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "\nField: {}", self.ct)?;
writeln!(f, " boundary: {}", self.inner.borrow().boundary)?;
writeln!(f, " headers:")?;
for (key, val) in self.headers.iter() {
writeln!(f, " {:?}: {:?}", key, val)?;
}
Ok(())
}
}
struct InnerField {
payload: Option<PayloadRef>,
boundary: String,
eof: bool,
length: Option<u64>,
}
impl InnerField {
fn new(
payload: PayloadRef,
boundary: String,
headers: &HeaderMap,
) -> Result<InnerField, PayloadError> {
let len = if let Some(len) = headers.get(&header::CONTENT_LENGTH) {
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
Some(len)
} else {
return Err(PayloadError::Incomplete(None));
}
} else {
return Err(PayloadError::Incomplete(None));
}
} else {
None
};
Ok(InnerField {
boundary,
payload: Some(payload),
eof: false,
length: len,
})
}
/// Reads body part content chunk of the specified size.
/// The body part must has `Content-Length` header with proper value.
fn read_len(
payload: &mut PayloadBuffer,
size: &mut u64,
) -> Poll<Option<Result<Bytes, MultipartError>>> {
if *size == 0 {
Poll::Ready(None)
} else {
match payload.read_max(*size)? {
Some(mut chunk) => {
let len = cmp::min(chunk.len() as u64, *size);
*size -= len;
let ch = chunk.split_to(len as usize);
if !chunk.is_empty() {
payload.unprocessed(chunk);
}
Poll::Ready(Some(Ok(ch)))
}
None => {
if payload.eof && (*size != 0) {
Poll::Ready(Some(Err(MultipartError::Incomplete)))
} else {
Poll::Pending
}
}
}
}
}
/// Reads content chunk of body part with unknown length.
/// The `Content-Length` header for body part is not necessary.
fn read_stream(
payload: &mut PayloadBuffer,
boundary: &str,
) -> Poll<Option<Result<Bytes, MultipartError>>> {
let mut pos = 0;
let len = payload.buf.len();
if len == 0 {
return if payload.eof {
Poll::Ready(Some(Err(MultipartError::Incomplete)))
} else {
Poll::Pending
};
}
// check boundary
if len > 4 && payload.buf[0] == b'\r' {
let b_len = if &payload.buf[..2] == b"\r\n" && &payload.buf[2..4] == b"--" {
Some(4)
} else if &payload.buf[1..3] == b"--" {
Some(3)
} else {
None
};
if let Some(b_len) = b_len {
let b_size = boundary.len() + b_len;
if len < b_size {
return Poll::Pending;
} else if &payload.buf[b_len..b_size] == boundary.as_bytes() {
// found boundary
return Poll::Ready(None);
}
}
}
loop {
return if let Some(idx) = twoway::find_bytes(&payload.buf[pos..], b"\r") {
let cur = pos + idx;
// check if we have enough data for boundary detection
if cur + 4 > len {
if cur > 0 {
Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze())))
} else {
Poll::Pending
}
} else {
// check boundary
if (&payload.buf[cur..cur + 2] == b"\r\n"
&& &payload.buf[cur + 2..cur + 4] == b"--")
|| (&payload.buf[cur..=cur] == b"\r"
&& &payload.buf[cur + 1..cur + 3] == b"--")
{
if cur != 0 {
// return buffer
Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze())))
} else {
pos = cur + 1;
continue;
}
} else {
// not boundary
pos = cur + 1;
continue;
}
}
} else {
Poll::Ready(Some(Ok(payload.buf.split().freeze())))
};
}
}
fn poll(&mut self, s: &Safety) -> Poll<Option<Result<Bytes, MultipartError>>> {
if self.payload.is_none() {
return Poll::Ready(None);
}
let result = if let Some(mut payload) = self.payload.as_ref().unwrap().get_mut(s)
{
if !self.eof {
let res = if let Some(ref mut len) = self.length {
InnerField::read_len(&mut *payload, len)
} else {
InnerField::read_stream(&mut *payload, &self.boundary)
};
match res {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Ok(bytes))) => return Poll::Ready(Some(Ok(bytes))),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => self.eof = true,
}
}
match payload.readline() {
Ok(None) => Poll::Pending,
Ok(Some(line)) => {
if line.as_ref() != b"\r\n" {
log::warn!("multipart field did not read all the data or it is malformed");
}
Poll::Ready(None)
}
Err(e) => Poll::Ready(Some(Err(e))),
}
} else {
Poll::Pending
};
if let Poll::Ready(None) = result {
self.payload.take();
}
result
}
}
struct PayloadRef {
payload: Rc<RefCell<PayloadBuffer>>,
}
impl PayloadRef {
fn new(payload: PayloadBuffer) -> PayloadRef {
PayloadRef {
payload: Rc::new(payload.into()),
}
}
fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<RefMut<'a, PayloadBuffer>>
where
'a: 'b,
{
if s.current() {
Some(self.payload.borrow_mut())
} else {
None
}
}
}
impl Clone for PayloadRef {
fn clone(&self) -> PayloadRef {
PayloadRef {
payload: Rc::clone(&self.payload),
}
}
}
/// Counter. It tracks of number of clones of payloads and give access to
/// payload only to top most task panics if Safety get destroyed and it not top
/// most task.
#[derive(Debug)]
struct Safety {
task: LocalWaker,
level: usize,
payload: Rc<PhantomData<bool>>,
clean: Rc<Cell<bool>>,
}
impl Safety {
fn new() -> Safety {
let payload = Rc::new(PhantomData);
Safety {
task: LocalWaker::new(),
level: Rc::strong_count(&payload),
clean: Rc::new(Cell::new(true)),
payload,
}
}
fn current(&self) -> bool {
Rc::strong_count(&self.payload) == self.level && self.clean.get()
}
fn is_clean(&self) -> bool {
self.clean.get()
}
fn clone(&self, cx: &mut Context) -> Safety {
let payload = Rc::clone(&self.payload);
let s = Safety {
task: LocalWaker::new(),
level: Rc::strong_count(&payload),
clean: self.clean.clone(),
payload,
};
s.task.register(cx.waker());
s
}
}
impl Drop for Safety {
fn drop(&mut self) {
// parent task is dead
if Rc::strong_count(&self.payload) != self.level {
self.clean.set(true);
}
if let Some(task) = self.task.take() {
task.wake()
}
}
}
/// Payload buffer
struct PayloadBuffer {
eof: bool,
buf: BytesMut,
stream: LocalBoxStream<'static, Result<Bytes, PayloadError>>,
}
impl PayloadBuffer {
/// Create new `PayloadBuffer` instance
fn new<S>(stream: S) -> Self
where
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
PayloadBuffer {
eof: false,
buf: BytesMut::new(),
stream: stream.boxed_local(),
}
}
fn poll_stream(&mut self, cx: &mut Context) -> Result<(), PayloadError> {
loop {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(Ok(data))) => self.buf.extend_from_slice(&data),
Poll::Ready(Some(Err(e))) => return Err(e),
Poll::Ready(None) => {
self.eof = true;
return Ok(());
}
Poll::Pending => return Ok(()),
}
}
}
/// Read exact number of bytes
#[cfg(test)]
fn read_exact(&mut self, size: usize) -> Option<Bytes> {
if size <= self.buf.len() {
Some(self.buf.split_to(size).freeze())
} else {
None
}
}
fn read_max(&mut self, size: u64) -> Result<Option<Bytes>, MultipartError> {
if !self.buf.is_empty() {
let size = std::cmp::min(self.buf.len() as u64, size) as usize;
Ok(Some(self.buf.split_to(size).freeze()))
} else if self.eof {
Err(MultipartError::Incomplete)
} else {
Ok(None)
}
}
/// Read until specified ending
pub fn read_until(&mut self, line: &[u8]) -> Result<Option<Bytes>, MultipartError> {
let res = twoway::find_bytes(&self.buf, line)
.map(|idx| self.buf.split_to(idx + line.len()).freeze());
if res.is_none() && self.eof {
Err(MultipartError::Incomplete)
} else {
Ok(res)
}
}
/// Read bytes until new line delimiter
pub fn readline(&mut self) -> Result<Option<Bytes>, MultipartError> {
self.read_until(b"\n")
}
/// Read bytes until new line delimiter or eof
pub fn readline_or_eof(&mut self) -> Result<Option<Bytes>, MultipartError> {
match self.readline() {
Err(MultipartError::Incomplete) if self.eof => {
Ok(Some(self.buf.split().freeze()))
}
line => line,
}
}
/// Put unprocessed data back to the buffer
pub fn unprocessed(&mut self, data: Bytes) {
let buf = BytesMut::from(data.as_ref());
let buf = std::mem::replace(&mut self.buf, buf);
self.buf.extend_from_slice(&buf);
}
}

2
rustfmt.toml Normal file
View file

@ -0,0 +1,2 @@
max_width = 89
reorder_imports = true