Do not swallow decoded read bytes in case of filter error

This commit is contained in:
Nikolay Kim 2021-12-27 18:28:50 +06:00
parent baba17d48c
commit aca40ad6ff
6 changed files with 54 additions and 4 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.0-b.7] - 2021-12-27
* Do not swallow decoded read bytes in case of filter error
## [0.1.0-b.6] - 2021-12-26
* Rename `RecvError::StopDispatcher` to `RecvError::Stop`

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.1.0-b.6"
version = "0.1.0-b.7"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]

View file

@ -82,7 +82,7 @@ impl IoRef {
///
/// Dispatcher does not wait for uncompleted responses, but flushes io buffers.
pub fn force_close(&self) {
log::trace!("force close framed object");
log::trace!("force close io stream object");
self.0.insert_flags(Flags::DSP_STOP | Flags::IO_SHUTDOWN);
self.0.read_task.wake();
self.0.write_task.wake();

View file

@ -37,7 +37,8 @@ impl ReadContext {
Ok(())
} else {
let mut dst = self.0 .0.read_buf.take();
let nbytes = self.0.filter().release_read_buf(buf, &mut dst, nbytes)?;
let result = self.0.filter().release_read_buf(buf, &mut dst, nbytes);
let nbytes = result.as_ref().map(|i| *i).unwrap_or(0);
if let Some(dst) = dst {
if self.0.flags().contains(Flags::IO_FILTERS) {
@ -61,7 +62,14 @@ impl ReadContext {
self.0 .0.dispatch_task.wake();
self.0 .0.insert_flags(Flags::RD_READY);
}
Ok(())
if let Err(err) = result {
self.0 .0.dispatch_task.wake();
self.0 .0.insert_flags(Flags::RD_READY);
Err(err)
} else {
Ok(())
}
}
}
}

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.0-b.4] - 2021-12-27
* Upgrade no ntex 0.5-b.4
## [0.1.0-b.3] - 2021-12-23
* Add impl openssl::Acceptor::from(SslAcceptor)

View file

@ -53,6 +53,40 @@ async fn test_openssl_string() {
assert_eq!(item, Bytes::from_static(b"test"));
}
#[cfg(feature = "openssl")]
#[ntex::test]
async fn test_openssl_read_before_error() {
env_logger::init();
use ntex::server::openssl;
use tls_openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
let srv = test_server(|| {
ntex::pipeline_factory(fn_service(|io: Io<_>| async move {
let res = io.read_ready().await;
assert!(res.is_ok());
Ok(io)
}))
.and_then(openssl::Acceptor::new(ssl_acceptor()))
.and_then(fn_service(|io: Io<_>| async move {
log::info!("ssl handshake completed");
io.encode(Bytes::from_static(b"test"), &BytesCodec).unwrap();
// ntex::time::sleep(ntex::time::Millis(1000)).await;
io.shutdown().await.unwrap();
Ok::<_, Box<dyn std::error::Error>>(())
}))
});
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let conn = ntex::connect::openssl::Connector::new(builder.build());
let addr = format!("127.0.0.1:{}", srv.addr().port());
let io = conn.call(addr.into()).await.unwrap();
let item = io.recv(&BytesCodec).await.unwrap().unwrap();
assert_eq!(item, Bytes::from_static(b"test"));
assert!(io.recv(&BytesCodec).await.is_err());
}
#[cfg(feature = "rustls")]
#[ntex::test]
async fn test_rustls_string() {