mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-01 20:07:39 +03:00
Add check for required io-uring opcodes (#532)
Some checks failed
Coverage / coverage (push) Failing after 2s
CI (OSX) / nightly - aarch64-apple-darwin (push) Waiting to run
CI (OSX) / stable - aarch64-apple-darwin (push) Waiting to run
CI (Windows) / nightly - x86_64-pc-windows-msvc (push) Waiting to run
CI (Windows) / stable - x86_64-pc-windows-msvc (push) Waiting to run
Checks / Check (push) Failing after 2s
Checks / Clippy (push) Failing after 2s
Checks / Rustfmt (push) Failing after 2s
CI (Linux) / nightly - x86_64-unknown-linux-gnu (push) Failing after 2s
CI (Linux) / stable - x86_64-unknown-linux-gnu (push) Failing after 2s
CI (Linux) / 1.75.0 - x86_64-unknown-linux-gnu (push) Failing after 2s
Some checks failed
Coverage / coverage (push) Failing after 2s
CI (OSX) / nightly - aarch64-apple-darwin (push) Waiting to run
CI (OSX) / stable - aarch64-apple-darwin (push) Waiting to run
CI (Windows) / nightly - x86_64-pc-windows-msvc (push) Waiting to run
CI (Windows) / stable - x86_64-pc-windows-msvc (push) Waiting to run
Checks / Check (push) Failing after 2s
Checks / Clippy (push) Failing after 2s
Checks / Rustfmt (push) Failing after 2s
CI (Linux) / nightly - x86_64-unknown-linux-gnu (push) Failing after 2s
CI (Linux) / stable - x86_64-unknown-linux-gnu (push) Failing after 2s
CI (Linux) / 1.75.0 - x86_64-unknown-linux-gnu (push) Failing after 2s
This commit is contained in:
parent
11734e8f1b
commit
5621ca1898
4 changed files with 83 additions and 25 deletions
|
@ -1,5 +1,11 @@
|
|||
# Changes
|
||||
|
||||
## [2.5.5] - 2025-03-17
|
||||
|
||||
* Add check for required io-uring opcodes
|
||||
|
||||
* Handle io-uring cancelation
|
||||
|
||||
## [2.5.4] - 2025-03-15
|
||||
|
||||
* Close FD in various case for poll driver
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "ntex-net"
|
||||
version = "2.5.4"
|
||||
version = "2.5.5"
|
||||
authors = ["ntex contributors <team@ntex.rs>"]
|
||||
description = "ntexwork utils for ntex framework"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@ -40,7 +40,7 @@ ntex-util = "2.5"
|
|||
|
||||
ntex-tokio = { version = "0.5.3", optional = true }
|
||||
ntex-compio = { version = "0.2.4", optional = true }
|
||||
ntex-neon = { version = "0.1.4", optional = true }
|
||||
ntex-neon = { version = "0.1.5", optional = true }
|
||||
|
||||
bitflags = { workspace = true }
|
||||
cfg-if = { workspace = true }
|
||||
|
|
|
@ -32,6 +32,10 @@ impl ConnectOps {
|
|||
Runtime::value(|rt| {
|
||||
let mut inner = None;
|
||||
rt.driver().register(|api| {
|
||||
if !api.is_supported(opcode::Connect::CODE) {
|
||||
panic!("opcode::Connect is required for io-uring support");
|
||||
}
|
||||
|
||||
let ops = Rc::new(ConnectOpsInner {
|
||||
api,
|
||||
ops: RefCell::new(Slab::new()),
|
||||
|
|
|
@ -13,11 +13,22 @@ pub(crate) struct StreamCtl<T> {
|
|||
inner: Rc<StreamOpsInner<T>>,
|
||||
}
|
||||
|
||||
bitflags::bitflags! {
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||
struct Flags: u8 {
|
||||
const RD_CANCELING = 0b0000_0001;
|
||||
const RD_REISSUE = 0b0000_0010;
|
||||
const WR_CANCELING = 0b0001_0000;
|
||||
const WR_REISSUE = 0b0010_0000;
|
||||
}
|
||||
}
|
||||
|
||||
struct StreamItem<T> {
|
||||
io: Option<T>,
|
||||
fd: Fd,
|
||||
context: IoContext,
|
||||
ref_count: usize,
|
||||
flags: Flags,
|
||||
rd_op: Option<NonZeroU32>,
|
||||
wr_op: Option<NonZeroU32>,
|
||||
}
|
||||
|
@ -61,6 +72,16 @@ impl<T: os::fd::AsRawFd + 'static> StreamOps<T> {
|
|||
Runtime::value(|rt| {
|
||||
let mut inner = None;
|
||||
rt.driver().register(|api| {
|
||||
if !api.is_supported(opcode::Recv::CODE) {
|
||||
panic!("opcode::Recv is required for io-uring support");
|
||||
}
|
||||
if !api.is_supported(opcode::Send::CODE) {
|
||||
panic!("opcode::Send is required for io-uring support");
|
||||
}
|
||||
if !api.is_supported(opcode::Close::CODE) {
|
||||
panic!("opcode::Close is required for io-uring support");
|
||||
}
|
||||
|
||||
let mut ops = Slab::new();
|
||||
ops.insert(Operation::Nop);
|
||||
|
||||
|
@ -88,6 +109,7 @@ impl<T: os::fd::AsRawFd + 'static> StreamOps<T> {
|
|||
ref_count: 1,
|
||||
rd_op: None,
|
||||
wr_op: None,
|
||||
flags: Flags::empty(),
|
||||
};
|
||||
let id = self.0.storage.borrow_mut().streams.insert(item);
|
||||
StreamCtl {
|
||||
|
@ -116,10 +138,19 @@ impl<T> Handler for StreamOpsHandler<T> {
|
|||
|
||||
match storage.ops.remove(user_data) {
|
||||
Operation::Recv { id, buf, context } => {
|
||||
log::debug!("{}: Recv canceled {:?}", context.tag(), id,);
|
||||
log::debug!("{}: Recv canceled {:?}", context.tag(), id);
|
||||
context.release_read_buf(buf);
|
||||
if let Some(item) = storage.streams.get_mut(id) {
|
||||
item.rd_op.take();
|
||||
item.flags.remove(Flags::RD_CANCELING);
|
||||
if item.flags.contains(Flags::RD_REISSUE) {
|
||||
item.flags.remove(Flags::RD_REISSUE);
|
||||
|
||||
let result = storage.recv(id, Some(context));
|
||||
if let Some((id, op)) = result {
|
||||
self.inner.api.submit(id, op);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Operation::Send { id, buf, context } => {
|
||||
|
@ -127,6 +158,15 @@ impl<T> Handler for StreamOpsHandler<T> {
|
|||
context.release_write_buf(buf);
|
||||
if let Some(item) = storage.streams.get_mut(id) {
|
||||
item.wr_op.take();
|
||||
item.flags.remove(Flags::WR_CANCELING);
|
||||
if item.flags.contains(Flags::WR_REISSUE) {
|
||||
item.flags.remove(Flags::WR_REISSUE);
|
||||
|
||||
let result = storage.send(id, Some(context));
|
||||
if let Some((id, op)) = result {
|
||||
self.inner.api.submit(id, op);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Operation::Nop | Operation::Close { .. } => {}
|
||||
|
@ -151,12 +191,11 @@ impl<T> Handler for StreamOpsHandler<T> {
|
|||
// reset op reference
|
||||
if let Some(item) = storage.streams.get_mut(id) {
|
||||
log::debug!(
|
||||
"{}: Recv completed {:?}, res: {:?}, buf({}): {:?}",
|
||||
"{}: Recv completed {:?}, res: {:?}, buf({})",
|
||||
context.tag(),
|
||||
item.fd,
|
||||
result,
|
||||
buf.remaining_mut(),
|
||||
buf,
|
||||
buf.remaining_mut()
|
||||
);
|
||||
item.rd_op.take();
|
||||
}
|
||||
|
@ -173,21 +212,24 @@ impl<T> Handler for StreamOpsHandler<T> {
|
|||
}
|
||||
Operation::Send { id, buf, context } => {
|
||||
// reset op reference
|
||||
if let Some(item) = storage.streams.get_mut(id) {
|
||||
let fd = if let Some(item) = storage.streams.get_mut(id) {
|
||||
log::debug!(
|
||||
"{}: Send completed: {:?}, res: {:?}",
|
||||
"{}: Send completed: {:?}, res: {:?}, buf({})",
|
||||
context.tag(),
|
||||
item.fd,
|
||||
result
|
||||
result,
|
||||
buf.len()
|
||||
);
|
||||
item.wr_op.take();
|
||||
}
|
||||
Some(item.fd)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// set read buf
|
||||
if context
|
||||
.set_write_buf(result.map(|size| size as usize), buf)
|
||||
.is_pending()
|
||||
{
|
||||
let result = context.set_write_buf(result.map(|size| size as usize), buf);
|
||||
if result.is_pending() {
|
||||
log::debug!("{}: Need to send more: {:?}", context.tag(), fd);
|
||||
if let Some((id, op)) = storage.send(id, Some(context)) {
|
||||
self.inner.api.submit(id, op);
|
||||
}
|
||||
|
@ -230,11 +272,10 @@ impl<T> StreamOpsStorage<T> {
|
|||
if item.rd_op.is_none() {
|
||||
if let Poll::Ready(mut buf) = item.context.get_read_buf() {
|
||||
log::debug!(
|
||||
"{}: Recv resume ({}), {:?} - {:?} = {:?}",
|
||||
"{}: Recv resume ({}), {:?} rem: {:?}",
|
||||
item.context.tag(),
|
||||
id,
|
||||
item.fd,
|
||||
buf,
|
||||
buf.remaining_mut()
|
||||
);
|
||||
|
||||
|
@ -252,6 +293,8 @@ impl<T> StreamOpsStorage<T> {
|
|||
item.rd_op = NonZeroU32::new(op_id as u32);
|
||||
return Some((op_id as u32, op));
|
||||
}
|
||||
} else if item.flags.contains(Flags::RD_CANCELING) {
|
||||
item.flags.insert(Flags::RD_REISSUE);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
@ -262,11 +305,11 @@ impl<T> StreamOpsStorage<T> {
|
|||
if item.wr_op.is_none() {
|
||||
if let Poll::Ready(buf) = item.context.get_write_buf() {
|
||||
log::debug!(
|
||||
"{}: Send resume ({}), {:?} {:?}",
|
||||
"{}: Send resume ({}), {:?} len: {:?}",
|
||||
item.context.tag(),
|
||||
id,
|
||||
item.fd,
|
||||
buf
|
||||
buf.len()
|
||||
);
|
||||
|
||||
let slice = buf.chunk();
|
||||
|
@ -283,6 +326,8 @@ impl<T> StreamOpsStorage<T> {
|
|||
item.wr_op = NonZeroU32::new(op_id as u32);
|
||||
return Some((op_id as u32, op));
|
||||
}
|
||||
} else if item.flags.contains(Flags::WR_CANCELING) {
|
||||
item.flags.insert(Flags::WR_REISSUE);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
@ -350,13 +395,16 @@ impl<T> StreamCtl<T> {
|
|||
let item = &mut storage.streams[self.id];
|
||||
|
||||
if let Some(rd_op) = item.rd_op {
|
||||
log::debug!(
|
||||
"{}: Recv to pause ({}), {:?}",
|
||||
item.context.tag(),
|
||||
self.id,
|
||||
item.fd
|
||||
);
|
||||
self.inner.api.cancel(rd_op.get());
|
||||
if !item.flags.contains(Flags::RD_CANCELING) {
|
||||
log::debug!(
|
||||
"{}: Recv to pause ({}), {:?}",
|
||||
item.context.tag(),
|
||||
self.id,
|
||||
item.fd
|
||||
);
|
||||
item.flags.insert(Flags::RD_CANCELING);
|
||||
self.inner.api.cancel(rd_op.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue