From 69809a70487ed1c7b3ae8506a8845a6c7f46ffc1 Mon Sep 17 00:00:00 2001 From: krircc <18028295+krircc@users.noreply.github.com> Date: Sun, 5 Apr 2020 12:28:02 +0800 Subject: [PATCH] add ntex-multipart (#1) Co-authored-by: krircc --- .github/workflows/linux.yml | 79 ++++ .github/workflows/osx.yml | 65 +++ .github/workflows/windows.yml | 49 ++ .gitignore | 13 + Cargo.toml | 4 + LICENSE | 25 + README.md | 28 +- ntex-multipart/CHANGES.md | 5 + ntex-multipart/Cargo.toml | 25 + ntex-multipart/LICENSE | 25 + ntex-multipart/README.md | 7 + ntex-multipart/src/error.rs | 55 +++ ntex-multipart/src/extractor.rs | 40 ++ ntex-multipart/src/lib.rs | 8 + ntex-multipart/src/server.rs | 793 ++++++++++++++++++++++++++++++++ rustfmt.toml | 2 + 16 files changed, 1221 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/linux.yml create mode 100644 .github/workflows/osx.yml create mode 100644 .github/workflows/windows.yml create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 ntex-multipart/CHANGES.md create mode 100644 ntex-multipart/Cargo.toml create mode 100644 ntex-multipart/LICENSE create mode 100644 ntex-multipart/README.md create mode 100644 ntex-multipart/src/error.rs create mode 100644 ntex-multipart/src/extractor.rs create mode 100644 ntex-multipart/src/lib.rs create mode 100644 ntex-multipart/src/server.rs create mode 100644 rustfmt.toml diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml new file mode 100644 index 00000000..d451f017 --- /dev/null +++ b/.github/workflows/linux.yml @@ -0,0 +1,79 @@ +name: CI (Linux) + +on: [push, pull_request] + +jobs: + build_and_test: + strategy: + fail-fast: false + matrix: + version: + - 1.42.0 # MSRV + - stable + - nightly + + name: ${{ matrix.version }} - x86_64-unknown-linux-gnu + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@master + + - name: Install ${{ matrix.version }} + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.version }}-x86_64-unknown-linux-gnu + profile: minimal + override: true + + - name: Generate Cargo.lock + uses: actions-rs/cargo@v1 + with: + command: generate-lockfile + + - name: Cache cargo registry + uses: actions/cache@v1 + with: + path: ~/.cargo/registry + key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-registry-trimmed-${{ hashFiles('**/Cargo.lock') }} + + - name: Cache cargo index + uses: actions/cache@v1 + with: + path: ~/.cargo/git + key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-index-trimmed-${{ hashFiles('**/Cargo.lock') }} + + - name: Cache cargo build + uses: actions/cache@v1 + with: + path: target + key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }} + + - name: check build + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --all-features --bins --examples --tests + + - name: tests + uses: actions-rs/cargo@v1 + timeout-minutes: 40 + with: + command: test + args: --all --all-features --no-fail-fast -- --nocapture + + - name: Generate coverage file + if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request') + run: | + cargo install cargo-tarpaulin + cargo tarpaulin --out Xml --all --all-features + + - name: Upload to Codecov + if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request') + uses: codecov/codecov-action@v1 + with: + file: cobertura.xml + + - name: Clear the cargo caches + run: | + cargo install cargo-cache --no-default-features --features ci-autoclean + cargo-cache diff --git a/.github/workflows/osx.yml b/.github/workflows/osx.yml new file mode 100644 index 00000000..cb3ad867 --- /dev/null +++ b/.github/workflows/osx.yml @@ -0,0 +1,65 @@ +name: CI (OSX) + +on: [push, pull_request] + +jobs: + build_and_test: + strategy: + fail-fast: false + matrix: + version: + - stable + - nightly + + name: ${{ matrix.version }} - x86_64-apple-darwin + runs-on: macOS-latest + + steps: + - uses: actions/checkout@master + + - name: Install ${{ matrix.version }} + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.version }}-x86_64-apple-darwin + profile: minimal + override: true + + - name: Generate Cargo.lock + uses: actions-rs/cargo@v1 + with: + command: generate-lockfile + + - name: Cache cargo registry + uses: actions/cache@v1 + with: + path: ~/.cargo/registry + key: ${{ matrix.version }}-x86_64-apple-darwin-cargo-registry-trimmed-${{ hashFiles('**/Cargo.lock') }} + + - name: Cache cargo index + uses: actions/cache@v1 + with: + path: ~/.cargo/git + key: ${{ matrix.version }}-x86_64-apple-darwin-cargo-index-trimmed-${{ hashFiles('**/Cargo.lock') }} + + - name: Cache cargo build + uses: actions/cache@v1 + with: + path: target + key: ${{ matrix.version }}-x86_64-apple-darwin-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }} + + - name: check build + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --all-features --bins --examples --tests + + - name: tests + uses: actions-rs/cargo@v1 + with: + command: test + args: --all --all-features --no-fail-fast -- --nocapture + + - name: Clear the cargo caches + run: | + cargo install cargo-cache --no-default-features --features ci-autoclean + cargo-cache diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml new file mode 100644 index 00000000..7c5c4287 --- /dev/null +++ b/.github/workflows/windows.yml @@ -0,0 +1,49 @@ +name: CI (Windows) + +on: [push, pull_request] + +env: + VCPKGRS_DYNAMIC: 1 + +jobs: + build_and_test: + strategy: + fail-fast: false + matrix: + version: + - stable + - nightly + + name: ${{ matrix.version }} - x86_64-pc-windows-msvc + runs-on: windows-latest + + steps: + - uses: actions/checkout@master + + - name: Install ${{ matrix.version }} + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.version }}-x86_64-pc-windows-msvc + profile: minimal + override: true + + - name: Install OpenSSL + run: | + vcpkg integrate install + vcpkg install openssl:x64-windows + Copy-Item C:\vcpkg\installed\x64-windows\bin\libcrypto-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libcrypto.dll + Copy-Item C:\vcpkg\installed\x64-windows\bin\libssl-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libssl.dll + Get-ChildItem C:\vcpkg\installed\x64-windows\bin + Get-ChildItem C:\vcpkg\installed\x64-windows\lib + + - name: check build + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --all-features --bins --examples --tests + + - name: tests + uses: actions-rs/cargo@v1 + with: + command: test + args: --all --all-features --no-fail-fast -- --nocapture diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..93b7b2d1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +/target +Cargo.lock +guide/build/ +/gh-pages + +*.so +*.out +*.pid +*.sock +*~ + +# These are backup files generated by rustfmt +**/*.rs.bk diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 00000000..99f84fc3 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,4 @@ +[workspace] +members = [ + "ntex-multipart", +] \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..0eed26a7 --- /dev/null +++ b/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2020 Nikolay Kim, krircc + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md index 6c2130df..515a1c89 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,26 @@ -ntex-extras -=========== \ No newline at end of file +
+

ntex-extras

+

A collection of additional crates supporting the ntex frameworks.

+

+ +[![Build Status](https://github.com/ntex-rs/ntex-extras/workflows/CI%20(Linux)/badge.svg)](https://travis-ci.org/ntex-rs/ntex) +[![Version](https://img.shields.io/badge/rustc-1.42+-lightgray.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.42.html) +![License](https://img.shields.io/crates/l/ntex-extras.svg) + +

+
+ +## Crates + +| Crate | | | +| -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------- | +| [ntex-multipart] | [![crates.io](https://img.shields.io/crates/v/ntex-multipart)](https://crates.io/crates/ntex-multipart) [![Documentation](https://docs.rs/ntex-multipart/badge.svg)](https://docs.rs/ntex-multipart) | Multipart support for ntex applications. | + + +[ntex-multipart]: ntex-multipart + +## License + +This project is licensed under + +* MIT license ([LICENSE-MIT](LICENSE-MIT) or [http://opensource.org/licenses/MIT](http://opensource.org/licenses/MIT)) diff --git a/ntex-multipart/CHANGES.md b/ntex-multipart/CHANGES.md new file mode 100644 index 00000000..b203b14e --- /dev/null +++ b/ntex-multipart/CHANGES.md @@ -0,0 +1,5 @@ +# Changes + +## [0.1.0] - 2020-04-05 + +* Fork to ntex namespace diff --git a/ntex-multipart/Cargo.toml b/ntex-multipart/Cargo.toml new file mode 100644 index 00000000..60d58118 --- /dev/null +++ b/ntex-multipart/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "ntex-multipart" +version = "0.1.0" +authors = ["krircc "] +description = "Multipart support for ntex framework." +readme = "README.md" +keywords = ["http", "web", "framework", "async", "futures"] +homepage = "https://ntex.rs" +repository = "https://github.com/ntex/ntex-extras.git" +documentation = "https://docs.rs/ntex-multipart/" +license = "MIT" +edition = "2018" + +[dependencies] +ntex = "0.1.1" +bytes = "0.5.3" +derive_more = "0.99.2" +httparse = "1.3" +futures = "0.3.1" +log = "0.4" +mime = "0.3" +twoway = "0.2" + +[dev-dependencies] +ntex = "0.1.1" \ No newline at end of file diff --git a/ntex-multipart/LICENSE b/ntex-multipart/LICENSE new file mode 100644 index 00000000..a7f2f828 --- /dev/null +++ b/ntex-multipart/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2020 krircc + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/ntex-multipart/README.md b/ntex-multipart/README.md new file mode 100644 index 00000000..f252f263 --- /dev/null +++ b/ntex-multipart/README.md @@ -0,0 +1,7 @@ +# Multipart support for ntex framework + +## Documentation & community resources + +* [API Documentation](https://docs.rs/ntex-multipart/) +* Cargo package: [actix-multipart](https://crates.io/crates/ntex-multipart) +* Minimum supported Rust version: 1.42 or later diff --git a/ntex-multipart/src/error.rs b/ntex-multipart/src/error.rs new file mode 100644 index 00000000..44b5d3d3 --- /dev/null +++ b/ntex-multipart/src/error.rs @@ -0,0 +1,55 @@ +//! Error and Result module +use derive_more::{Display, From}; +use ntex::http::error::{ParseError, PayloadError}; +use ntex::http::StatusCode; +use ntex::web::{DefaultError, WebResponseError}; + +/// A set of errors that can occur during parsing multipart streams +#[derive(Debug, Display, From)] +pub enum MultipartError { + /// Content-Type header is not found + #[display(fmt = "No Content-type header found")] + NoContentType, + /// Can not parse Content-Type header + #[display(fmt = "Can not parse Content-Type header")] + ParseContentType, + /// Multipart boundary is not found + #[display(fmt = "Multipart boundary is not found")] + Boundary, + /// Nested multipart is not supported + #[display(fmt = "Nested multipart is not supported")] + Nested, + /// Multipart stream is incomplete + #[display(fmt = "Multipart stream is incomplete")] + Incomplete, + /// Error during field parsing + #[display(fmt = "{}", _0)] + Parse(ParseError), + /// Payload error + #[display(fmt = "{}", _0)] + Payload(PayloadError), + /// Not consumed + #[display(fmt = "Multipart stream is not consumed")] + NotConsumed, +} + +impl std::error::Error for MultipartError {} + +/// Return `BadRequest` for `MultipartError` +impl WebResponseError for MultipartError { + fn status_code(&self) -> StatusCode { + StatusCode::BAD_REQUEST + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ntex::web::HttpResponse; + + #[test] + fn test_multipart_error() { + let resp: HttpResponse = MultipartError::Boundary.error_response(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } +} diff --git a/ntex-multipart/src/extractor.rs b/ntex-multipart/src/extractor.rs new file mode 100644 index 00000000..a64876c4 --- /dev/null +++ b/ntex-multipart/src/extractor.rs @@ -0,0 +1,40 @@ +use futures::future::{ok, Ready}; +use ntex::http::Payload; +use ntex::web::{ErrorRenderer, FromRequest, HttpRequest}; + +use crate::server::Multipart; + +/// Get request's payload as multipart stream +/// +/// Content-type: multipart/form-data; +/// +/// ## Server example +/// +/// ```rust +/// use futures::{Stream, StreamExt}; +/// use ntex::web::{HttpResponse, Error}; +/// use ntex_multipart as mp; +/// +/// async fn index(mut payload: mp::Multipart) -> Result { +/// // iterate over multipart stream +/// while let Some(item) = payload.next().await { +/// let mut field = item?; +/// +/// // Field in turn is stream of *Bytes* object +/// while let Some(chunk) = field.next().await { +/// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk?)); +/// } +/// } +/// Ok(HttpResponse::Ok().into()) +/// } +/// # fn main() {} +/// ``` +impl FromRequest for Multipart { + type Error = Err::Container; + type Future = Ready>; + + #[inline] + fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { + ok(Multipart::new(req.headers(), payload.take())) + } +} diff --git a/ntex-multipart/src/lib.rs b/ntex-multipart/src/lib.rs new file mode 100644 index 00000000..43eb048c --- /dev/null +++ b/ntex-multipart/src/lib.rs @@ -0,0 +1,8 @@ +#![allow(clippy::borrow_interior_mutable_const)] + +mod error; +mod extractor; +mod server; + +pub use self::error::MultipartError; +pub use self::server::{Field, Multipart}; diff --git a/ntex-multipart/src/server.rs b/ntex-multipart/src/server.rs new file mode 100644 index 00000000..6b2011fa --- /dev/null +++ b/ntex-multipart/src/server.rs @@ -0,0 +1,793 @@ +//! Multipart payload support +use std::cell::{Cell, RefCell, RefMut}; +use std::convert::TryFrom; +use std::marker::PhantomData; +use std::pin::Pin; +use std::rc::Rc; +use std::task::{Context, Poll}; +use std::{cmp, fmt}; + +use bytes::{Bytes, BytesMut}; +use futures::stream::{LocalBoxStream, Stream, StreamExt}; +use httparse; +use mime; + +use ntex::http::error::{ParseError, PayloadError}; +use ntex::http::header::{self, HeaderMap, HeaderName, HeaderValue}; +use ntex::task::LocalWaker; + +use crate::error::MultipartError; + +const MAX_HEADERS: usize = 32; + +/// The server-side implementation of `multipart/form-data` requests. +/// +/// This will parse the incoming stream into `MultipartItem` instances via its +/// Stream implementation. +/// `MultipartItem::Field` contains multipart field. `MultipartItem::Multipart` +/// is used for nested multipart streams. +pub struct Multipart { + safety: Safety, + error: Option, + inner: Option>>, +} + +enum InnerMultipartItem { + None, + Field(Rc>), +} + +#[derive(PartialEq, Debug)] +enum InnerState { + /// Stream eof + Eof, + /// Skip data until first boundary + FirstBoundary, + /// Reading boundary + Boundary, + /// Reading Headers, + Headers, +} + +struct InnerMultipart { + payload: PayloadRef, + boundary: String, + state: InnerState, + item: InnerMultipartItem, +} + +impl Multipart { + /// Create multipart instance for boundary. + pub fn new(headers: &HeaderMap, stream: S) -> Multipart + where + S: Stream> + Unpin + 'static, + { + match Self::boundary(headers) { + Ok(boundary) => Multipart { + error: None, + safety: Safety::new(), + inner: Some(Rc::new(RefCell::new(InnerMultipart { + boundary, + payload: PayloadRef::new(PayloadBuffer::new(Box::new(stream))), + state: InnerState::FirstBoundary, + item: InnerMultipartItem::None, + }))), + }, + Err(err) => Multipart { + error: Some(err), + safety: Safety::new(), + inner: None, + }, + } + } + + /// Extract boundary info from headers. + fn boundary(headers: &HeaderMap) -> Result { + if let Some(content_type) = headers.get(&header::CONTENT_TYPE) { + if let Ok(content_type) = content_type.to_str() { + if let Ok(ct) = content_type.parse::() { + if let Some(boundary) = ct.get_param(mime::BOUNDARY) { + Ok(boundary.as_str().to_owned()) + } else { + Err(MultipartError::Boundary) + } + } else { + Err(MultipartError::ParseContentType) + } + } else { + Err(MultipartError::ParseContentType) + } + } else { + Err(MultipartError::NoContentType) + } + } +} + +impl Stream for Multipart { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + if let Some(err) = self.error.take() { + Poll::Ready(Some(Err(err))) + } else if self.safety.current() { + let this = self.get_mut(); + let mut inner = this.inner.as_mut().unwrap().borrow_mut(); + if let Some(mut payload) = inner.payload.get_mut(&this.safety) { + payload.poll_stream(cx)?; + } + inner.poll(&this.safety, cx) + } else if !self.safety.is_clean() { + Poll::Ready(Some(Err(MultipartError::NotConsumed))) + } else { + Poll::Pending + } + } +} + +impl InnerMultipart { + fn read_headers( + payload: &mut PayloadBuffer, + ) -> Result, MultipartError> { + match payload.read_until(b"\r\n\r\n")? { + None => { + if payload.eof { + Err(MultipartError::Incomplete) + } else { + Ok(None) + } + } + Some(bytes) => { + let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS]; + match httparse::parse_headers(&bytes, &mut hdrs) { + Ok(httparse::Status::Complete((_, hdrs))) => { + // convert headers + let mut headers = HeaderMap::with_capacity(hdrs.len()); + for h in hdrs { + if let Ok(name) = HeaderName::try_from(h.name) { + if let Ok(value) = HeaderValue::try_from(h.value) { + headers.append(name, value); + } else { + return Err(ParseError::Header.into()); + } + } else { + return Err(ParseError::Header.into()); + } + } + Ok(Some(headers)) + } + Ok(httparse::Status::Partial) => Err(ParseError::Header.into()), + Err(err) => Err(ParseError::from(err).into()), + } + } + } + } + + fn read_boundary( + payload: &mut PayloadBuffer, + boundary: &str, + ) -> Result, MultipartError> { + // TODO: need to read epilogue + match payload.readline_or_eof()? { + None => { + if payload.eof { + Ok(Some(true)) + } else { + Ok(None) + } + } + Some(chunk) => { + if chunk.len() < boundary.len() + 4 + || &chunk[..2] != b"--" + || &chunk[2..boundary.len() + 2] != boundary.as_bytes() + { + Err(MultipartError::Boundary) + } else if &chunk[boundary.len() + 2..] == b"\r\n" { + Ok(Some(false)) + } else if &chunk[boundary.len() + 2..boundary.len() + 4] == b"--" + && (chunk.len() == boundary.len() + 4 + || &chunk[boundary.len() + 4..] == b"\r\n") + { + Ok(Some(true)) + } else { + Err(MultipartError::Boundary) + } + } + } + } + + fn skip_until_boundary( + payload: &mut PayloadBuffer, + boundary: &str, + ) -> Result, MultipartError> { + let mut eof = false; + loop { + match payload.readline()? { + Some(chunk) => { + if chunk.is_empty() { + return Err(MultipartError::Boundary); + } + if chunk.len() < boundary.len() { + continue; + } + if &chunk[..2] == b"--" + && &chunk[2..chunk.len() - 2] == boundary.as_bytes() + { + break; + } else { + if chunk.len() < boundary.len() + 2 { + continue; + } + let b: &[u8] = boundary.as_ref(); + if &chunk[..boundary.len()] == b + && &chunk[boundary.len()..boundary.len() + 2] == b"--" + { + eof = true; + break; + } + } + } + None => { + return if payload.eof { + Err(MultipartError::Incomplete) + } else { + Ok(None) + }; + } + } + } + Ok(Some(eof)) + } + + fn poll( + &mut self, + safety: &Safety, + cx: &mut Context, + ) -> Poll>> { + if self.state == InnerState::Eof { + Poll::Ready(None) + } else { + // release field + loop { + // Nested multipart streams of fields has to be consumed + // before switching to next + if safety.current() { + let stop = match self.item { + InnerMultipartItem::Field(ref mut field) => { + match field.borrow_mut().poll(safety) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Some(Ok(_))) => continue, + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(Err(e))) + } + Poll::Ready(None) => true, + } + } + InnerMultipartItem::None => false, + }; + if stop { + self.item = InnerMultipartItem::None; + } + if let InnerMultipartItem::None = self.item { + break; + } + } + } + + let headers = if let Some(mut payload) = self.payload.get_mut(safety) { + match self.state { + // read until first boundary + InnerState::FirstBoundary => { + match InnerMultipart::skip_until_boundary( + &mut *payload, + &self.boundary, + )? { + Some(eof) => { + if eof { + self.state = InnerState::Eof; + return Poll::Ready(None); + } else { + self.state = InnerState::Headers; + } + } + None => return Poll::Pending, + } + } + // read boundary + InnerState::Boundary => { + match InnerMultipart::read_boundary( + &mut *payload, + &self.boundary, + )? { + None => return Poll::Pending, + Some(eof) => { + if eof { + self.state = InnerState::Eof; + return Poll::Ready(None); + } else { + self.state = InnerState::Headers; + } + } + } + } + _ => (), + } + + // read field headers for next field + if self.state == InnerState::Headers { + if let Some(headers) = InnerMultipart::read_headers(&mut *payload)? { + self.state = InnerState::Boundary; + headers + } else { + return Poll::Pending; + } + } else { + unreachable!() + } + } else { + log::debug!("NotReady: field is in flight"); + return Poll::Pending; + }; + + // content type + let mut mt = mime::APPLICATION_OCTET_STREAM; + if let Some(content_type) = headers.get(&header::CONTENT_TYPE) { + if let Ok(content_type) = content_type.to_str() { + if let Ok(ct) = content_type.parse::() { + mt = ct; + } + } + } + + self.state = InnerState::Boundary; + + // nested multipart stream + if mt.type_() == mime::MULTIPART { + Poll::Ready(Some(Err(MultipartError::Nested))) + } else { + let field = Rc::new(RefCell::new(InnerField::new( + self.payload.clone(), + self.boundary.clone(), + &headers, + )?)); + self.item = InnerMultipartItem::Field(Rc::clone(&field)); + + Poll::Ready(Some(Ok(Field::new(safety.clone(cx), headers, mt, field)))) + } + } + } +} + +impl Drop for InnerMultipart { + fn drop(&mut self) { + // InnerMultipartItem::Field has to be dropped first because of Safety. + self.item = InnerMultipartItem::None; + } +} + +/// A single field in a multipart stream +pub struct Field { + ct: mime::Mime, + headers: HeaderMap, + inner: Rc>, + safety: Safety, +} + +impl Field { + fn new( + safety: Safety, + headers: HeaderMap, + ct: mime::Mime, + inner: Rc>, + ) -> Self { + Field { + ct, + headers, + inner, + safety, + } + } + + /// Get a map of headers + pub fn headers(&self) -> &HeaderMap { + &self.headers + } + + /// Get the content type of the field + pub fn content_type(&self) -> &mime::Mime { + &self.ct + } +} + +impl Stream for Field { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if self.safety.current() { + let mut inner = self.inner.borrow_mut(); + if let Some(mut payload) = + inner.payload.as_ref().unwrap().get_mut(&self.safety) + { + payload.poll_stream(cx)?; + } + inner.poll(&self.safety) + } else if !self.safety.is_clean() { + Poll::Ready(Some(Err(MultipartError::NotConsumed))) + } else { + Poll::Pending + } + } +} + +impl fmt::Debug for Field { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + writeln!(f, "\nField: {}", self.ct)?; + writeln!(f, " boundary: {}", self.inner.borrow().boundary)?; + writeln!(f, " headers:")?; + for (key, val) in self.headers.iter() { + writeln!(f, " {:?}: {:?}", key, val)?; + } + Ok(()) + } +} + +struct InnerField { + payload: Option, + boundary: String, + eof: bool, + length: Option, +} + +impl InnerField { + fn new( + payload: PayloadRef, + boundary: String, + headers: &HeaderMap, + ) -> Result { + let len = if let Some(len) = headers.get(&header::CONTENT_LENGTH) { + if let Ok(s) = len.to_str() { + if let Ok(len) = s.parse::() { + Some(len) + } else { + return Err(PayloadError::Incomplete(None)); + } + } else { + return Err(PayloadError::Incomplete(None)); + } + } else { + None + }; + + Ok(InnerField { + boundary, + payload: Some(payload), + eof: false, + length: len, + }) + } + + /// Reads body part content chunk of the specified size. + /// The body part must has `Content-Length` header with proper value. + fn read_len( + payload: &mut PayloadBuffer, + size: &mut u64, + ) -> Poll>> { + if *size == 0 { + Poll::Ready(None) + } else { + match payload.read_max(*size)? { + Some(mut chunk) => { + let len = cmp::min(chunk.len() as u64, *size); + *size -= len; + let ch = chunk.split_to(len as usize); + if !chunk.is_empty() { + payload.unprocessed(chunk); + } + Poll::Ready(Some(Ok(ch))) + } + None => { + if payload.eof && (*size != 0) { + Poll::Ready(Some(Err(MultipartError::Incomplete))) + } else { + Poll::Pending + } + } + } + } + } + + /// Reads content chunk of body part with unknown length. + /// The `Content-Length` header for body part is not necessary. + fn read_stream( + payload: &mut PayloadBuffer, + boundary: &str, + ) -> Poll>> { + let mut pos = 0; + + let len = payload.buf.len(); + if len == 0 { + return if payload.eof { + Poll::Ready(Some(Err(MultipartError::Incomplete))) + } else { + Poll::Pending + }; + } + + // check boundary + if len > 4 && payload.buf[0] == b'\r' { + let b_len = if &payload.buf[..2] == b"\r\n" && &payload.buf[2..4] == b"--" { + Some(4) + } else if &payload.buf[1..3] == b"--" { + Some(3) + } else { + None + }; + + if let Some(b_len) = b_len { + let b_size = boundary.len() + b_len; + if len < b_size { + return Poll::Pending; + } else if &payload.buf[b_len..b_size] == boundary.as_bytes() { + // found boundary + return Poll::Ready(None); + } + } + } + + loop { + return if let Some(idx) = twoway::find_bytes(&payload.buf[pos..], b"\r") { + let cur = pos + idx; + + // check if we have enough data for boundary detection + if cur + 4 > len { + if cur > 0 { + Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze()))) + } else { + Poll::Pending + } + } else { + // check boundary + if (&payload.buf[cur..cur + 2] == b"\r\n" + && &payload.buf[cur + 2..cur + 4] == b"--") + || (&payload.buf[cur..=cur] == b"\r" + && &payload.buf[cur + 1..cur + 3] == b"--") + { + if cur != 0 { + // return buffer + Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze()))) + } else { + pos = cur + 1; + continue; + } + } else { + // not boundary + pos = cur + 1; + continue; + } + } + } else { + Poll::Ready(Some(Ok(payload.buf.split().freeze()))) + }; + } + } + + fn poll(&mut self, s: &Safety) -> Poll>> { + if self.payload.is_none() { + return Poll::Ready(None); + } + + let result = if let Some(mut payload) = self.payload.as_ref().unwrap().get_mut(s) + { + if !self.eof { + let res = if let Some(ref mut len) = self.length { + InnerField::read_len(&mut *payload, len) + } else { + InnerField::read_stream(&mut *payload, &self.boundary) + }; + + match res { + Poll::Pending => return Poll::Pending, + Poll::Ready(Some(Ok(bytes))) => return Poll::Ready(Some(Ok(bytes))), + Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), + Poll::Ready(None) => self.eof = true, + } + } + + match payload.readline() { + Ok(None) => Poll::Pending, + Ok(Some(line)) => { + if line.as_ref() != b"\r\n" { + log::warn!("multipart field did not read all the data or it is malformed"); + } + Poll::Ready(None) + } + Err(e) => Poll::Ready(Some(Err(e))), + } + } else { + Poll::Pending + }; + + if let Poll::Ready(None) = result { + self.payload.take(); + } + result + } +} + +struct PayloadRef { + payload: Rc>, +} + +impl PayloadRef { + fn new(payload: PayloadBuffer) -> PayloadRef { + PayloadRef { + payload: Rc::new(payload.into()), + } + } + + fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option> + where + 'a: 'b, + { + if s.current() { + Some(self.payload.borrow_mut()) + } else { + None + } + } +} + +impl Clone for PayloadRef { + fn clone(&self) -> PayloadRef { + PayloadRef { + payload: Rc::clone(&self.payload), + } + } +} + +/// Counter. It tracks of number of clones of payloads and give access to +/// payload only to top most task panics if Safety get destroyed and it not top +/// most task. +#[derive(Debug)] +struct Safety { + task: LocalWaker, + level: usize, + payload: Rc>, + clean: Rc>, +} + +impl Safety { + fn new() -> Safety { + let payload = Rc::new(PhantomData); + Safety { + task: LocalWaker::new(), + level: Rc::strong_count(&payload), + clean: Rc::new(Cell::new(true)), + payload, + } + } + + fn current(&self) -> bool { + Rc::strong_count(&self.payload) == self.level && self.clean.get() + } + + fn is_clean(&self) -> bool { + self.clean.get() + } + + fn clone(&self, cx: &mut Context) -> Safety { + let payload = Rc::clone(&self.payload); + let s = Safety { + task: LocalWaker::new(), + level: Rc::strong_count(&payload), + clean: self.clean.clone(), + payload, + }; + s.task.register(cx.waker()); + s + } +} + +impl Drop for Safety { + fn drop(&mut self) { + // parent task is dead + if Rc::strong_count(&self.payload) != self.level { + self.clean.set(true); + } + if let Some(task) = self.task.take() { + task.wake() + } + } +} + +/// Payload buffer +struct PayloadBuffer { + eof: bool, + buf: BytesMut, + stream: LocalBoxStream<'static, Result>, +} + +impl PayloadBuffer { + /// Create new `PayloadBuffer` instance + fn new(stream: S) -> Self + where + S: Stream> + 'static, + { + PayloadBuffer { + eof: false, + buf: BytesMut::new(), + stream: stream.boxed_local(), + } + } + + fn poll_stream(&mut self, cx: &mut Context) -> Result<(), PayloadError> { + loop { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Ready(Some(Ok(data))) => self.buf.extend_from_slice(&data), + Poll::Ready(Some(Err(e))) => return Err(e), + Poll::Ready(None) => { + self.eof = true; + return Ok(()); + } + Poll::Pending => return Ok(()), + } + } + } + + /// Read exact number of bytes + #[cfg(test)] + fn read_exact(&mut self, size: usize) -> Option { + if size <= self.buf.len() { + Some(self.buf.split_to(size).freeze()) + } else { + None + } + } + + fn read_max(&mut self, size: u64) -> Result, MultipartError> { + if !self.buf.is_empty() { + let size = std::cmp::min(self.buf.len() as u64, size) as usize; + Ok(Some(self.buf.split_to(size).freeze())) + } else if self.eof { + Err(MultipartError::Incomplete) + } else { + Ok(None) + } + } + + /// Read until specified ending + pub fn read_until(&mut self, line: &[u8]) -> Result, MultipartError> { + let res = twoway::find_bytes(&self.buf, line) + .map(|idx| self.buf.split_to(idx + line.len()).freeze()); + + if res.is_none() && self.eof { + Err(MultipartError::Incomplete) + } else { + Ok(res) + } + } + + /// Read bytes until new line delimiter + pub fn readline(&mut self) -> Result, MultipartError> { + self.read_until(b"\n") + } + + /// Read bytes until new line delimiter or eof + pub fn readline_or_eof(&mut self) -> Result, MultipartError> { + match self.readline() { + Err(MultipartError::Incomplete) if self.eof => { + Ok(Some(self.buf.split().freeze())) + } + line => line, + } + } + + /// Put unprocessed data back to the buffer + pub fn unprocessed(&mut self, data: Bytes) { + let buf = BytesMut::from(data.as_ref()); + let buf = std::mem::replace(&mut self.buf, buf); + self.buf.extend_from_slice(&buf); + } +} diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 00000000..94bd11d5 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,2 @@ +max_width = 89 +reorder_imports = true