Check for nested io operations (#170)

This commit is contained in:
Nikolay Kim 2023-01-30 16:36:29 +06:00 committed by GitHub
parent 0f8387c3ac
commit 7f3efca56b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 80 additions and 9 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.2.8] - 2023-01-30
* Check for nested io operations
## [0.2.7] - 2023-01-29
* Refactor buffer api

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.2.7"
version = "0.2.8"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]

View file

@ -152,6 +152,13 @@ impl Stack {
}
let result = f(rb.as_mut().unwrap());
// check nested updates
if item.0.take().is_some() {
log::error!("Nested read io operation is detected");
io.force_close();
}
if let Some(b) = rb {
if b.is_empty() {
io.memory_pool().release_read_buf(b);
@ -191,6 +198,13 @@ impl Stack {
let mut wb = item.1.take();
let result = f(&mut wb);
// check nested updates
if item.1.take().is_some() {
log::error!("Nested write io operation is detected");
io.force_close();
}
if let Some(b) = wb {
if b.is_empty() {
io.memory_pool().release_write_buf(b);

View file

@ -407,13 +407,10 @@ mod tests {
use ntex_bytes::{Bytes, PoolId, PoolRef};
use ntex_codec::BytesCodec;
use ntex_util::future::Ready;
use ntex_util::time::{sleep, Millis};
use crate::testing::IoTest;
use crate::{io::Flags, Io, IoRef, IoStream};
use ntex_util::{future::Ready, time::sleep, time::Millis, time::Seconds};
use super::*;
use crate::{io::Flags, testing::IoTest, Io, IoRef, IoStream};
pub(crate) struct State(IoRef);
@ -705,6 +702,55 @@ mod tests {
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1, 2]);
}
#[ntex::test]
async fn test_disconnect_during_read_backpressure() {
env_logger::init();
let (client, server) = IoTest::create();
client.remote_buffer_cap(0);
let (disp, state) = Dispatcher::debug(
server,
BytesCodec,
ntex_util::services::inflight::InFlightService::new(
1,
ntex_service::fn_service(move |msg: DispatchItem<BytesCodec>| async move {
match msg {
DispatchItem::Item(_) => {
sleep(Millis(500)).await;
return Ok::<_, ()>(None);
}
_ => (),
}
Ok(None)
}),
),
);
let disp = disp.keepalive_timeout(Seconds::ZERO);
let pool = PoolId::P10.pool_ref();
pool.set_read_params(1024, 512);
state.set_memory_pool(pool);
let (tx, rx) = ntex::channel::oneshot::channel();
ntex::rt::spawn(async move {
let _ = disp.await;
let _ = tx.send(());
});
let bytes = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(1024)
.map(char::from)
.collect::<String>();
client.write(bytes.clone());
sleep(Millis(25)).await;
client.write(bytes);
sleep(Millis(25)).await;
// close read side
state.close();
let _ = rx.recv().await;
}
#[ntex::test]
async fn test_keepalive() {
let (client, server) = IoTest::create();

View file

@ -76,6 +76,9 @@ impl Filter for Base {
if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
Poll::Ready(ReadStatus::Terminate)
} else if flags.intersects(Flags::IO_STOPPING_FILTERS) {
self.0 .0.read_task.register(cx.waker());
Poll::Ready(ReadStatus::Ready)
} else if flags.intersects(Flags::RD_PAUSED | Flags::RD_BUF_FULL) {
self.0 .0.read_task.register(cx.waker());
Poll::Pending

View file

@ -350,9 +350,11 @@ impl<F> Io<F> {
#[inline]
/// Pause read task
pub fn pause(&self) {
if !self.0.flags().contains(Flags::RD_PAUSED) {
self.0 .0.read_task.wake();
self.0 .0.insert_flags(Flags::RD_PAUSED);
}
}
#[inline]
/// Encode item, send to a peer

View file

@ -196,6 +196,8 @@ impl IoRef {
log::debug!("start keep-alive timeout {:?}", timeout);
self.0.insert_flags(Flags::KEEPALIVE);
self.0.keepalive.set(timer::register(timeout, self));
} else {
self.0.remove_flags(Flags::KEEPALIVE);
}
}

View file

@ -58,7 +58,7 @@ ntex-util = "0.2.0"
ntex-bytes = "0.1.19"
ntex-h2 = "0.2.1"
ntex-rt = "0.4.7"
ntex-io = "0.2.7"
ntex-io = "0.2.8"
ntex-tls = "0.2.4"
ntex-tokio = { version = "0.2.1", optional = true }
ntex-glommio = { version = "0.2.1", optional = true }