From aca40ad6ff36db37f6f3ef705bc2d2fa913d8c2a Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 27 Dec 2021 18:28:50 +0600 Subject: [PATCH] Do not swallow decoded read bytes in case of filter error --- ntex-io/CHANGES.md | 4 ++++ ntex-io/Cargo.toml | 2 +- ntex-io/src/ioref.rs | 2 +- ntex-io/src/tasks.rs | 12 ++++++++++-- ntex-tls/CHANGES.md | 4 ++++ ntex/tests/connect.rs | 34 ++++++++++++++++++++++++++++++++++ 6 files changed, 54 insertions(+), 4 deletions(-) diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 9d213e9b..ac9224c9 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.0-b.7] - 2021-12-27 + +* Do not swallow decoded read bytes in case of filter error + ## [0.1.0-b.6] - 2021-12-26 * Rename `RecvError::StopDispatcher` to `RecvError::Stop` diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 86cf134d..05e73a22 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.1.0-b.6" +version = "0.1.0-b.7" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index c7622453..67dd808b 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -82,7 +82,7 @@ impl IoRef { /// /// Dispatcher does not wait for uncompleted responses, but flushes io buffers. pub fn force_close(&self) { - log::trace!("force close framed object"); + log::trace!("force close io stream object"); self.0.insert_flags(Flags::DSP_STOP | Flags::IO_SHUTDOWN); self.0.read_task.wake(); self.0.write_task.wake(); diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 38099274..08e3e1f2 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -37,7 +37,8 @@ impl ReadContext { Ok(()) } else { let mut dst = self.0 .0.read_buf.take(); - let nbytes = self.0.filter().release_read_buf(buf, &mut dst, nbytes)?; + let result = self.0.filter().release_read_buf(buf, &mut dst, nbytes); + let nbytes = result.as_ref().map(|i| *i).unwrap_or(0); if let Some(dst) = dst { if self.0.flags().contains(Flags::IO_FILTERS) { @@ -61,7 +62,14 @@ impl ReadContext { self.0 .0.dispatch_task.wake(); self.0 .0.insert_flags(Flags::RD_READY); } - Ok(()) + + if let Err(err) = result { + self.0 .0.dispatch_task.wake(); + self.0 .0.insert_flags(Flags::RD_READY); + Err(err) + } else { + Ok(()) + } } } } diff --git a/ntex-tls/CHANGES.md b/ntex-tls/CHANGES.md index 076a37b2..6512a1c3 100644 --- a/ntex-tls/CHANGES.md +++ b/ntex-tls/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.0-b.4] - 2021-12-27 + +* Upgrade no ntex 0.5-b.4 + ## [0.1.0-b.3] - 2021-12-23 * Add impl openssl::Acceptor::from(SslAcceptor) diff --git a/ntex/tests/connect.rs b/ntex/tests/connect.rs index db89243c..7aee5a72 100644 --- a/ntex/tests/connect.rs +++ b/ntex/tests/connect.rs @@ -53,6 +53,40 @@ async fn test_openssl_string() { assert_eq!(item, Bytes::from_static(b"test")); } +#[cfg(feature = "openssl")] +#[ntex::test] +async fn test_openssl_read_before_error() { + env_logger::init(); + use ntex::server::openssl; + use tls_openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; + + let srv = test_server(|| { + ntex::pipeline_factory(fn_service(|io: Io<_>| async move { + let res = io.read_ready().await; + assert!(res.is_ok()); + Ok(io) + })) + .and_then(openssl::Acceptor::new(ssl_acceptor())) + .and_then(fn_service(|io: Io<_>| async move { + log::info!("ssl handshake completed"); + io.encode(Bytes::from_static(b"test"), &BytesCodec).unwrap(); + // ntex::time::sleep(ntex::time::Millis(1000)).await; + io.shutdown().await.unwrap(); + Ok::<_, Box>(()) + })) + }); + + let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); + builder.set_verify(SslVerifyMode::NONE); + + let conn = ntex::connect::openssl::Connector::new(builder.build()); + let addr = format!("127.0.0.1:{}", srv.addr().port()); + let io = conn.call(addr.into()).await.unwrap(); + let item = io.recv(&BytesCodec).await.unwrap().unwrap(); + assert_eq!(item, Bytes::from_static(b"test")); + assert!(io.recv(&BytesCodec).await.is_err()); +} + #[cfg(feature = "rustls")] #[ntex::test] async fn test_rustls_string() {