mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-02 20:37:39 +03:00
Redesign neon poll support (#535)
This commit is contained in:
parent
e904cf85f1
commit
e3f58cce27
12 changed files with 205 additions and 231 deletions
5
.github/workflows/cov.yml
vendored
5
.github/workflows/cov.yml
vendored
|
@ -8,11 +8,6 @@ jobs:
|
||||||
env:
|
env:
|
||||||
CARGO_TERM_COLOR: always
|
CARGO_TERM_COLOR: always
|
||||||
steps:
|
steps:
|
||||||
- name: Free Disk Space
|
|
||||||
uses: jlumbroso/free-disk-space@main
|
|
||||||
with:
|
|
||||||
tool-cache: true
|
|
||||||
|
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
- name: Install Rust
|
- name: Install Rust
|
||||||
run: rustup update nightly
|
run: rustup update nightly
|
||||||
|
|
5
.github/workflows/linux.yml
vendored
5
.github/workflows/linux.yml
vendored
|
@ -16,11 +16,6 @@ jobs:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: Free Disk Space
|
|
||||||
uses: jlumbroso/free-disk-space@main
|
|
||||||
with:
|
|
||||||
tool-cache: true
|
|
||||||
|
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
- name: Install ${{ matrix.version }}
|
- name: Install ${{ matrix.version }}
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [2.11.1] - 2025-03-20
|
||||||
|
|
||||||
|
* Add readiness check support
|
||||||
|
|
||||||
## [2.11.0] - 2025-03-10
|
## [2.11.0] - 2025-03-10
|
||||||
|
|
||||||
* Add single io context
|
* Add single io context
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex-io"
|
name = "ntex-io"
|
||||||
version = "2.11.0"
|
version = "2.11.1"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "Utilities for encoding and decoding frames"
|
description = "Utilities for encoding and decoding frames"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
|
|
@ -722,28 +722,36 @@ impl IoContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get read buffer
|
/// Get read buffer
|
||||||
pub fn with_read_buf<F>(&self, f: F) -> Poll<()>
|
pub fn is_read_ready(&self) -> bool {
|
||||||
where
|
|
||||||
F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
|
|
||||||
{
|
|
||||||
let result = self.with_read_buf_inner(f);
|
|
||||||
|
|
||||||
// check read readiness
|
// check read readiness
|
||||||
if result.is_pending() {
|
if let Some(waker) = self.0 .0.read_task.take() {
|
||||||
if let Some(waker) = self.0 .0.read_task.take() {
|
let mut cx = Context::from_waker(&waker);
|
||||||
let mut cx = Context::from_waker(&waker);
|
|
||||||
|
|
||||||
if let Poll::Ready(ReadStatus::Ready) =
|
if let Poll::Ready(ReadStatus::Ready) = self.0.filter().poll_read_ready(&mut cx)
|
||||||
self.0.filter().poll_read_ready(&mut cx)
|
{
|
||||||
{
|
return true;
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
fn with_read_buf_inner<F>(&self, f: F) -> Poll<()>
|
pub fn is_write_ready(&self) -> bool {
|
||||||
|
if let Some(waker) = self.0 .0.write_task.take() {
|
||||||
|
let ready = self
|
||||||
|
.0
|
||||||
|
.filter()
|
||||||
|
.poll_write_ready(&mut Context::from_waker(&waker));
|
||||||
|
if !matches!(
|
||||||
|
ready,
|
||||||
|
Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown)
|
||||||
|
) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_read_buf<F>(&self, f: F) -> Poll<()>
|
||||||
where
|
where
|
||||||
F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
|
F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
|
||||||
{
|
{
|
||||||
|
@ -838,33 +846,8 @@ impl IoContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_write_buf<F>(&self, f: F) -> Poll<()>
|
|
||||||
where
|
|
||||||
F: FnOnce(&BytesVec) -> Poll<io::Result<usize>>,
|
|
||||||
{
|
|
||||||
let result = self.with_write_buf_inner(f);
|
|
||||||
|
|
||||||
// check write readiness
|
|
||||||
if result.is_pending() {
|
|
||||||
let inner = &self.0 .0;
|
|
||||||
if let Some(waker) = inner.write_task.take() {
|
|
||||||
let ready = self
|
|
||||||
.0
|
|
||||||
.filter()
|
|
||||||
.poll_write_ready(&mut Context::from_waker(&waker));
|
|
||||||
if !matches!(
|
|
||||||
ready,
|
|
||||||
Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown)
|
|
||||||
) {
|
|
||||||
return Poll::Ready(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get write buffer
|
/// Get write buffer
|
||||||
fn with_write_buf_inner<F>(&self, f: F) -> Poll<()>
|
pub fn with_write_buf<F>(&self, f: F) -> Poll<()>
|
||||||
where
|
where
|
||||||
F: FnOnce(&BytesVec) -> Poll<io::Result<usize>>,
|
F: FnOnce(&BytesVec) -> Poll<io::Result<usize>>,
|
||||||
{
|
{
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [2.5.6] - 2025-03-20
|
||||||
|
|
||||||
|
* Redesign neon poll support
|
||||||
|
|
||||||
## [2.5.5] - 2025-03-17
|
## [2.5.5] - 2025-03-17
|
||||||
|
|
||||||
* Add check for required io-uring opcodes
|
* Add check for required io-uring opcodes
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex-net"
|
name = "ntex-net"
|
||||||
version = "2.5.5"
|
version = "2.5.6"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "ntexwork utils for ntex framework"
|
description = "ntexwork utils for ntex framework"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
@ -34,13 +34,13 @@ io-uring = ["ntex-neon/io-uring", "dep:io-uring"]
|
||||||
ntex-service = "3.3"
|
ntex-service = "3.3"
|
||||||
ntex-bytes = "0.1"
|
ntex-bytes = "0.1"
|
||||||
ntex-http = "0.1"
|
ntex-http = "0.1"
|
||||||
ntex-io = "2.11"
|
ntex-io = "2.11.1"
|
||||||
ntex-rt = "0.4.25"
|
ntex-rt = "0.4.25"
|
||||||
ntex-util = "2.5"
|
ntex-util = "2.5"
|
||||||
|
|
||||||
ntex-tokio = { version = "0.5.3", optional = true }
|
ntex-tokio = { version = "0.5.3", optional = true }
|
||||||
ntex-compio = { version = "0.2.4", optional = true }
|
ntex-compio = { version = "0.2.4", optional = true }
|
||||||
ntex-neon = { version = "0.1.5", optional = true }
|
ntex-neon = { version = "0.1.6", optional = true }
|
||||||
|
|
||||||
bitflags = { workspace = true }
|
bitflags = { workspace = true }
|
||||||
cfg-if = { workspace = true }
|
cfg-if = { workspace = true }
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::os::fd::{AsRawFd, RawFd};
|
use std::os::fd::{AsRawFd, RawFd};
|
||||||
use std::{cell::RefCell, collections::VecDeque, io, rc::Rc, task::Poll};
|
use std::{cell::RefCell, collections::VecDeque, io, rc::Rc, task::Poll};
|
||||||
|
|
||||||
use ntex_neon::driver::{DriverApi, Handler, Interest};
|
use ntex_neon::driver::{DriverApi, Event, Handler};
|
||||||
use ntex_neon::{syscall, Runtime};
|
use ntex_neon::{syscall, Runtime};
|
||||||
use ntex_util::channel::oneshot::Sender;
|
use ntex_util::channel::oneshot::Sender;
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
|
@ -12,8 +12,7 @@ pub(crate) struct ConnectOps(Rc<ConnectOpsInner>);
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum Change {
|
enum Change {
|
||||||
Readable,
|
Event(Event),
|
||||||
Writable,
|
|
||||||
Error(io::Error),
|
Error(io::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,20 +66,15 @@ impl ConnectOps {
|
||||||
let item = Item { fd, sender };
|
let item = Item { fd, sender };
|
||||||
let id = self.0.connects.borrow_mut().insert(item);
|
let id = self.0.connects.borrow_mut().insert(item);
|
||||||
|
|
||||||
self.0.api.register(fd, id, Interest::Writable);
|
self.0.api.attach(fd, id as u32, Some(Event::writable(0)));
|
||||||
Ok(id)
|
Ok(id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Handler for ConnectOpsBatcher {
|
impl Handler for ConnectOpsBatcher {
|
||||||
fn readable(&mut self, id: usize) {
|
fn event(&mut self, id: usize, event: Event) {
|
||||||
log::debug!("connect-fd is readable {:?}", id);
|
log::debug!("connect-fd is readable {:?}", id);
|
||||||
self.feed.push_back((id, Change::Readable));
|
self.feed.push_back((id, Change::Event(event)));
|
||||||
}
|
|
||||||
|
|
||||||
fn writable(&mut self, id: usize) {
|
|
||||||
log::debug!("connect-fd is writable {:?}", id);
|
|
||||||
self.feed.push_back((id, Change::Writable));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn error(&mut self, id: usize, err: io::Error) {
|
fn error(&mut self, id: usize, err: io::Error) {
|
||||||
|
@ -99,32 +93,33 @@ impl Handler for ConnectOpsBatcher {
|
||||||
if connects.contains(id) {
|
if connects.contains(id) {
|
||||||
let item = connects.remove(id);
|
let item = connects.remove(id);
|
||||||
match change {
|
match change {
|
||||||
Change::Readable => unreachable!(),
|
Change::Event(event) => {
|
||||||
Change::Writable => {
|
if event.writable {
|
||||||
let mut err: libc::c_int = 0;
|
let mut err: libc::c_int = 0;
|
||||||
let mut err_len =
|
let mut err_len =
|
||||||
std::mem::size_of::<libc::c_int>() as libc::socklen_t;
|
std::mem::size_of::<libc::c_int>() as libc::socklen_t;
|
||||||
|
|
||||||
let res = syscall!(libc::getsockopt(
|
let res = syscall!(libc::getsockopt(
|
||||||
item.fd.as_raw_fd(),
|
item.fd.as_raw_fd(),
|
||||||
libc::SOL_SOCKET,
|
libc::SOL_SOCKET,
|
||||||
libc::SO_ERROR,
|
libc::SO_ERROR,
|
||||||
&mut err as *mut _ as *mut _,
|
&mut err as *mut _ as *mut _,
|
||||||
&mut err_len
|
&mut err_len
|
||||||
));
|
));
|
||||||
|
|
||||||
let res = if err == 0 {
|
let res = if err == 0 {
|
||||||
res.map(|_| ())
|
res.map(|_| ())
|
||||||
} else {
|
} else {
|
||||||
Err(io::Error::from_raw_os_error(err))
|
Err(io::Error::from_raw_os_error(err))
|
||||||
};
|
};
|
||||||
|
|
||||||
self.inner.api.unregister_all(item.fd);
|
self.inner.api.detach(item.fd, id as u32);
|
||||||
let _ = item.sender.send(res);
|
let _ = item.sender.send(res);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Change::Error(err) => {
|
Change::Error(err) => {
|
||||||
let _ = item.sender.send(Err(err));
|
let _ = item.sender.send(Err(err));
|
||||||
self.inner.api.unregister_all(item.fd);
|
self.inner.api.detach(item.fd, id as u32);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::os::fd::{AsRawFd, RawFd};
|
use std::os::fd::{AsRawFd, RawFd};
|
||||||
use std::{cell::Cell, collections::VecDeque, future::Future, io, rc::Rc, task};
|
use std::{cell::Cell, collections::VecDeque, future::Future, io, rc::Rc, task};
|
||||||
|
|
||||||
use ntex_neon::driver::{DriverApi, Handler, Interest};
|
use ntex_neon::driver::{DriverApi, Event, Handler};
|
||||||
use ntex_neon::{syscall, Runtime};
|
use ntex_neon::{syscall, Runtime};
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ use ntex_bytes::BufMut;
|
||||||
use ntex_io::IoContext;
|
use ntex_io::IoContext;
|
||||||
|
|
||||||
pub(crate) struct StreamCtl<T> {
|
pub(crate) struct StreamCtl<T> {
|
||||||
id: usize,
|
id: u32,
|
||||||
inner: Rc<StreamOpsInner<T>>,
|
inner: Rc<StreamOpsInner<T>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,8 +24,7 @@ pub(crate) struct StreamOps<T>(Rc<StreamOpsInner<T>>);
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum Change {
|
enum Change {
|
||||||
Readable,
|
Event(Event),
|
||||||
Writable,
|
|
||||||
Error(io::Error),
|
Error(io::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,7 +35,7 @@ struct StreamOpsHandler<T> {
|
||||||
|
|
||||||
struct StreamOpsInner<T> {
|
struct StreamOpsInner<T> {
|
||||||
api: DriverApi,
|
api: DriverApi,
|
||||||
feed: Cell<Option<VecDeque<usize>>>,
|
feed: Cell<Option<VecDeque<u32>>>,
|
||||||
streams: Cell<Option<Box<Slab<StreamItem<T>>>>>,
|
streams: Cell<Option<Box<Slab<StreamItem<T>>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,19 +61,23 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl<T> {
|
pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl<T> {
|
||||||
|
let fd = io.as_raw_fd();
|
||||||
let item = StreamItem {
|
let item = StreamItem {
|
||||||
|
fd,
|
||||||
context,
|
context,
|
||||||
fd: io.as_raw_fd(),
|
|
||||||
io: Some(io),
|
io: Some(io),
|
||||||
ref_count: 1,
|
ref_count: 1,
|
||||||
};
|
};
|
||||||
self.with(|streams| {
|
let stream = self.with(move |streams| {
|
||||||
let id = streams.insert(item);
|
let id = streams.insert(item) as u32;
|
||||||
StreamCtl {
|
StreamCtl {
|
||||||
id,
|
id,
|
||||||
inner: self.0.clone(),
|
inner: self.0.clone(),
|
||||||
}
|
}
|
||||||
})
|
});
|
||||||
|
|
||||||
|
self.0.api.attach(fd, stream.id, None);
|
||||||
|
stream
|
||||||
}
|
}
|
||||||
|
|
||||||
fn with<F, R>(&self, f: F) -> R
|
fn with<F, R>(&self, f: F) -> R
|
||||||
|
@ -95,14 +98,9 @@ impl<T> Clone for StreamOps<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Handler for StreamOpsHandler<T> {
|
impl<T> Handler for StreamOpsHandler<T> {
|
||||||
fn readable(&mut self, id: usize) {
|
fn event(&mut self, id: usize, event: Event) {
|
||||||
log::debug!("FD is readable {:?}", id);
|
log::debug!("FD is readable {:?}", id);
|
||||||
self.feed.push_back((id, Change::Readable));
|
self.feed.push_back((id, Change::Event(event)));
|
||||||
}
|
|
||||||
|
|
||||||
fn writable(&mut self, id: usize) {
|
|
||||||
log::debug!("FD is writable {:?}", id);
|
|
||||||
self.feed.push_back((id, Change::Writable));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn error(&mut self, id: usize, err: io::Error) {
|
fn error(&mut self, id: usize, err: io::Error) {
|
||||||
|
@ -120,56 +118,75 @@ impl<T> Handler for StreamOpsHandler<T> {
|
||||||
|
|
||||||
for (id, change) in self.feed.drain(..) {
|
for (id, change) in self.feed.drain(..) {
|
||||||
match change {
|
match change {
|
||||||
Change::Readable => {
|
Change::Event(ev) => {
|
||||||
let item = &mut streams[id];
|
let item = &mut streams[id];
|
||||||
let result = item.context.with_read_buf(|buf| {
|
let mut renew_ev = Event::new(0, false, false).with_interrupt();
|
||||||
let chunk = buf.chunk_mut();
|
if ev.readable {
|
||||||
let b = chunk.as_mut_ptr();
|
let result = item.context.with_read_buf(|buf| {
|
||||||
task::Poll::Ready(
|
let chunk = buf.chunk_mut();
|
||||||
task::ready!(syscall!(
|
let b = chunk.as_mut_ptr();
|
||||||
break libc::read(item.fd, b as _, chunk.len())
|
task::Poll::Ready(
|
||||||
))
|
task::ready!(syscall!(
|
||||||
.inspect(|size| {
|
break libc::read(item.fd, b as _, chunk.len())
|
||||||
unsafe { buf.advance_mut(*size) };
|
))
|
||||||
log::debug!(
|
.inspect(|size| {
|
||||||
"{}: {:?}, SIZE: {:?}",
|
unsafe { buf.advance_mut(*size) };
|
||||||
item.context.tag(),
|
log::debug!(
|
||||||
item.fd,
|
"{}: {:?}, SIZE: {:?}",
|
||||||
size
|
item.context.tag(),
|
||||||
);
|
item.fd,
|
||||||
}),
|
size
|
||||||
)
|
);
|
||||||
});
|
}),
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
if item.io.is_some() && result.is_pending() {
|
if item.io.is_some() && result.is_pending() {
|
||||||
self.inner.api.register(item.fd, id, Interest::Readable);
|
if item.context.is_read_ready() {
|
||||||
|
renew_ev.readable = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
if ev.writable {
|
||||||
Change::Writable => {
|
let result = item.context.with_write_buf(|buf| {
|
||||||
let item = &mut streams[id];
|
log::debug!(
|
||||||
let result = item.context.with_write_buf(|buf| {
|
"{}: writing {:?} SIZE: {:?}",
|
||||||
log::debug!(
|
item.context.tag(),
|
||||||
"{}: writing {:?} SIZE: {:?}",
|
item.fd,
|
||||||
item.context.tag(),
|
buf.len()
|
||||||
item.fd,
|
);
|
||||||
buf.len()
|
let slice = &buf[..];
|
||||||
);
|
syscall!(
|
||||||
let slice = &buf[..];
|
break libc::write(
|
||||||
syscall!(
|
item.fd,
|
||||||
break libc::write(item.fd, slice.as_ptr() as _, slice.len())
|
slice.as_ptr() as _,
|
||||||
)
|
slice.len()
|
||||||
});
|
)
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
if item.io.is_some() && result.is_pending() {
|
if item.io.is_some() && result.is_pending() {
|
||||||
log::debug!("{}: want write {:?}", item.context.tag(), item.fd,);
|
if item.context.is_write_ready() {
|
||||||
self.inner.api.register(item.fd, id, Interest::Writable);
|
renew_ev.writable = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ev.is_interrupt() {
|
||||||
|
item.context.stopped(None);
|
||||||
|
if let Some(_) = item.io.take() {
|
||||||
|
close(id as u32, item.fd, &self.inner.api);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
self.inner.api.modify(item.fd, id as u32, renew_ev);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Change::Error(err) => {
|
Change::Error(err) => {
|
||||||
if let Some(item) = streams.get_mut(id) {
|
if let Some(item) = streams.get_mut(id) {
|
||||||
item.context.stopped(Some(err));
|
item.context.stopped(Some(err));
|
||||||
if let Some(_) = item.io.take() {
|
if let Some(_) = item.io.take() {
|
||||||
close(id, item.fd, &self.inner.api);
|
close(id as u32, item.fd, &self.inner.api);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -179,10 +196,10 @@ impl<T> Handler for StreamOpsHandler<T> {
|
||||||
// extra
|
// extra
|
||||||
let mut feed = self.inner.feed.take().unwrap();
|
let mut feed = self.inner.feed.take().unwrap();
|
||||||
for id in feed.drain(..) {
|
for id in feed.drain(..) {
|
||||||
let item = &mut streams[id];
|
let item = &mut streams[id as usize];
|
||||||
item.ref_count -= 1;
|
item.ref_count -= 1;
|
||||||
if item.ref_count == 0 {
|
if item.ref_count == 0 {
|
||||||
let item = streams.remove(id);
|
let item = streams.remove(id as usize);
|
||||||
log::debug!(
|
log::debug!(
|
||||||
"{}: Drop io ({}), {:?}, has-io: {}",
|
"{}: Drop io ({}), {:?}, has-io: {}",
|
||||||
item.context.tag(),
|
item.context.tag(),
|
||||||
|
@ -201,8 +218,8 @@ impl<T> Handler for StreamOpsHandler<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn close(id: usize, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle<io::Result<i32>> {
|
fn close(id: u32, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle<io::Result<i32>> {
|
||||||
api.unregister_all(fd);
|
api.detach(fd, id);
|
||||||
ntex_rt::spawn_blocking(move || {
|
ntex_rt::spawn_blocking(move || {
|
||||||
syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?;
|
syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?;
|
||||||
syscall!(libc::close(fd))
|
syscall!(libc::close(fd))
|
||||||
|
@ -211,10 +228,10 @@ fn close(id: usize, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle<io::Resul
|
||||||
|
|
||||||
impl<T> StreamCtl<T> {
|
impl<T> StreamCtl<T> {
|
||||||
pub(crate) fn close(self) -> impl Future<Output = io::Result<()>> {
|
pub(crate) fn close(self) -> impl Future<Output = io::Result<()>> {
|
||||||
let (io, fd) =
|
let id = self.id as usize;
|
||||||
self.with(|streams| (streams[self.id].io.take(), streams[self.id].fd));
|
let (io, fd) = self.with(|streams| (streams[id].io.take(), streams[id].fd));
|
||||||
let fut = if let Some(io) = io {
|
let fut = if let Some(io) = io {
|
||||||
log::debug!("Closing ({}), {:?}", self.id, fd);
|
log::debug!("Closing ({}), {:?}", id, fd);
|
||||||
std::mem::forget(io);
|
std::mem::forget(io);
|
||||||
Some(close(self.id, fd, &self.inner.api))
|
Some(close(self.id, fd, &self.inner.api))
|
||||||
} else {
|
} else {
|
||||||
|
@ -234,53 +251,32 @@ impl<T> StreamCtl<T> {
|
||||||
where
|
where
|
||||||
F: FnOnce(Option<&T>) -> R,
|
F: FnOnce(Option<&T>) -> R,
|
||||||
{
|
{
|
||||||
self.with(|streams| f(streams[self.id].io.as_ref()))
|
self.with(|streams| f(streams[self.id as usize].io.as_ref()))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn pause_all(&self) {
|
pub(crate) fn modify(&self, readable: bool, writable: bool) {
|
||||||
self.with(|streams| {
|
self.with(|streams| {
|
||||||
let item = &mut streams[self.id];
|
let item = &mut streams[self.id as usize];
|
||||||
|
|
||||||
log::debug!(
|
log::debug!(
|
||||||
"{}: Pause all io ({}), {:?}",
|
"{}: Modify interest ({}), {:?} read: {:?}, write: {:?}",
|
||||||
item.context.tag(),
|
item.context.tag(),
|
||||||
self.id,
|
self.id,
|
||||||
item.fd
|
item.fd,
|
||||||
);
|
readable,
|
||||||
self.inner.api.unregister_all(item.fd);
|
writable
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn pause_read(&self) {
|
|
||||||
self.with(|streams| {
|
|
||||||
let item = &mut streams[self.id];
|
|
||||||
|
|
||||||
log::debug!(
|
|
||||||
"{}: Pause io read ({}), {:?}",
|
|
||||||
item.context.tag(),
|
|
||||||
self.id,
|
|
||||||
item.fd
|
|
||||||
);
|
|
||||||
self.inner.api.unregister(item.fd, Interest::Readable);
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn resume_read(&self) {
|
|
||||||
self.with(|streams| {
|
|
||||||
let item = &mut streams[self.id];
|
|
||||||
|
|
||||||
log::debug!(
|
|
||||||
"{}: Resume io read ({}), {:?}",
|
|
||||||
item.context.tag(),
|
|
||||||
self.id,
|
|
||||||
item.fd
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let result = item.context.with_read_buf(|buf| {
|
let mut event = Event::new(0, false, false).with_interrupt();
|
||||||
let chunk = buf.chunk_mut();
|
|
||||||
let b = chunk.as_mut_ptr();
|
if readable {
|
||||||
task::Poll::Ready(
|
let result = item.context.with_read_buf(|buf| {
|
||||||
task::ready!(syscall!(break libc::read(item.fd, b as _, chunk.len())))
|
let chunk = buf.chunk_mut();
|
||||||
|
let b = chunk.as_mut_ptr();
|
||||||
|
task::Poll::Ready(
|
||||||
|
task::ready!(syscall!(
|
||||||
|
break libc::read(item.fd, b as _, chunk.len())
|
||||||
|
))
|
||||||
.inspect(|size| {
|
.inspect(|size| {
|
||||||
unsafe { buf.advance_mut(*size) };
|
unsafe { buf.advance_mut(*size) };
|
||||||
log::debug!(
|
log::debug!(
|
||||||
|
@ -290,45 +286,37 @@ impl<T> StreamCtl<T> {
|
||||||
size
|
size
|
||||||
);
|
);
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
if item.io.is_some() && result.is_pending() {
|
if item.io.is_some() && result.is_pending() {
|
||||||
self.inner
|
if item.context.is_read_ready() {
|
||||||
.api
|
event.readable = true;
|
||||||
.register(item.fd, self.id, Interest::Readable);
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn resume_write(&self) {
|
if writable {
|
||||||
self.with(|streams| {
|
let result = item.context.with_write_buf(|buf| {
|
||||||
let item = &mut streams[self.id];
|
log::debug!(
|
||||||
|
"{}: Writing io ({}), buf: {:?}",
|
||||||
|
item.context.tag(),
|
||||||
|
self.id,
|
||||||
|
buf.len()
|
||||||
|
);
|
||||||
|
|
||||||
let result = item.context.with_write_buf(|buf| {
|
let slice = &buf[..];
|
||||||
log::debug!(
|
syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len()))
|
||||||
"{}: Writing io ({}), buf: {:?}",
|
});
|
||||||
item.context.tag(),
|
|
||||||
self.id,
|
|
||||||
buf.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
let slice = &buf[..];
|
if item.io.is_some() && result.is_pending() {
|
||||||
syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len()))
|
if item.context.is_write_ready() {
|
||||||
});
|
event.writable = true;
|
||||||
|
}
|
||||||
if item.io.is_some() && result.is_pending() {
|
}
|
||||||
log::debug!(
|
|
||||||
"{}: Write is pending ({}), {:?}",
|
|
||||||
item.context.tag(),
|
|
||||||
self.id,
|
|
||||||
item.context.flags()
|
|
||||||
);
|
|
||||||
|
|
||||||
self.inner
|
|
||||||
.api
|
|
||||||
.register(item.fd, self.id, Interest::Writable);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.inner.api.modify(item.fd, self.id as u32, event);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,7 +334,7 @@ impl<T> StreamCtl<T> {
|
||||||
impl<T> Clone for StreamCtl<T> {
|
impl<T> Clone for StreamCtl<T> {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
self.with(|streams| {
|
self.with(|streams| {
|
||||||
streams[self.id].ref_count += 1;
|
streams[self.id as usize].ref_count += 1;
|
||||||
Self {
|
Self {
|
||||||
id: self.id,
|
id: self.id,
|
||||||
inner: self.inner.clone(),
|
inner: self.inner.clone(),
|
||||||
|
@ -358,9 +346,10 @@ impl<T> Clone for StreamCtl<T> {
|
||||||
impl<T> Drop for StreamCtl<T> {
|
impl<T> Drop for StreamCtl<T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Some(mut streams) = self.inner.streams.take() {
|
if let Some(mut streams) = self.inner.streams.take() {
|
||||||
streams[self.id].ref_count -= 1;
|
let id = self.id as usize;
|
||||||
if streams[self.id].ref_count == 0 {
|
streams[id].ref_count -= 1;
|
||||||
let item = streams.remove(self.id);
|
if streams[id].ref_count == 0 {
|
||||||
|
let item = streams.remove(id);
|
||||||
log::debug!(
|
log::debug!(
|
||||||
"{}: Drop io ({}), {:?}, has-io: {}",
|
"{}: Drop io ({}), {:?}, has-io: {}",
|
||||||
item.context.tag(),
|
item.context.tag(),
|
||||||
|
|
|
@ -54,21 +54,26 @@ enum Status {
|
||||||
async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
|
async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
|
||||||
// Handle io read readiness
|
// Handle io read readiness
|
||||||
let st = poll_fn(|cx| {
|
let st = poll_fn(|cx| {
|
||||||
|
let mut modify = false;
|
||||||
|
let mut readable = false;
|
||||||
|
let mut writable = false;
|
||||||
let read = match context.poll_read_ready(cx) {
|
let read = match context.poll_read_ready(cx) {
|
||||||
Poll::Ready(ReadStatus::Ready) => {
|
Poll::Ready(ReadStatus::Ready) => {
|
||||||
ctl.resume_read();
|
modify = true;
|
||||||
|
readable = true;
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
Poll::Ready(ReadStatus::Terminate) => Poll::Ready(()),
|
Poll::Ready(ReadStatus::Terminate) => Poll::Ready(()),
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
ctl.pause_read();
|
modify = true;
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let write = match context.poll_write_ready(cx) {
|
let write = match context.poll_write_ready(cx) {
|
||||||
Poll::Ready(WriteStatus::Ready) => {
|
Poll::Ready(WriteStatus::Ready) => {
|
||||||
ctl.resume_write();
|
modify = true;
|
||||||
|
writable = true;
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
Poll::Ready(WriteStatus::Shutdown) => Poll::Ready(Status::Shutdown),
|
Poll::Ready(WriteStatus::Shutdown) => Poll::Ready(Status::Shutdown),
|
||||||
|
@ -76,6 +81,10 @@ async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
|
||||||
Poll::Pending => Poll::Pending,
|
Poll::Pending => Poll::Pending,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if modify {
|
||||||
|
ctl.modify(readable, writable);
|
||||||
|
}
|
||||||
|
|
||||||
if read.is_pending() && write.is_pending() {
|
if read.is_pending() && write.is_pending() {
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
} else if write.is_ready() {
|
} else if write.is_ready() {
|
||||||
|
@ -86,7 +95,7 @@ async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
ctl.resume_write();
|
ctl.modify(false, true);
|
||||||
context.shutdown(st == Status::Shutdown).await;
|
context.shutdown(st == Status::Shutdown).await;
|
||||||
context.stopped(ctl.close().await.err());
|
context.stopped(ctl.close().await.err());
|
||||||
}
|
}
|
||||||
|
|
|
@ -246,7 +246,7 @@ where
|
||||||
.run();
|
.run();
|
||||||
|
|
||||||
crate::rt::spawn(async move {
|
crate::rt::spawn(async move {
|
||||||
sleep(Millis(75)).await;
|
sleep(Millis(125)).await;
|
||||||
tx.send((system, srv, local_addr)).unwrap();
|
tx.send((system, srv, local_addr)).unwrap();
|
||||||
});
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -698,7 +698,7 @@ where
|
||||||
.run();
|
.run();
|
||||||
|
|
||||||
crate::rt::spawn(async move {
|
crate::rt::spawn(async move {
|
||||||
sleep(Millis(75)).await;
|
sleep(Millis(125)).await;
|
||||||
tx.send((System::current(), srv, local_addr)).unwrap();
|
tx.send((System::current(), srv, local_addr)).unwrap();
|
||||||
});
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue