mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 13:27:39 +03:00
Merge 1fefbf2e6f
into 01d3a2440b
This commit is contained in:
commit
2d1611a0b0
19 changed files with 324 additions and 186 deletions
|
@ -45,7 +45,12 @@ ntex-util = { path = "ntex-util" }
|
||||||
ntex-compio = { path = "ntex-compio" }
|
ntex-compio = { path = "ntex-compio" }
|
||||||
ntex-tokio = { path = "ntex-tokio" }
|
ntex-tokio = { path = "ntex-tokio" }
|
||||||
|
|
||||||
|
ntex-neon = { git = "https://github.com/ntex-rs/neon.git" }
|
||||||
|
#ntex-neon = { path = "../dev/neon" }
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
|
ntex-polling = "3.7.4"
|
||||||
|
|
||||||
async-channel = "2"
|
async-channel = "2"
|
||||||
async-task = "4.5.0"
|
async-task = "4.5.0"
|
||||||
atomic-waker = "1.1"
|
atomic-waker = "1.1"
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex-io"
|
name = "ntex-io"
|
||||||
version = "2.11.1"
|
version = "2.11.2"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "Utilities for encoding and decoding frames"
|
description = "Utilities for encoding and decoding frames"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
|
|
@ -98,17 +98,19 @@ impl IoState {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn io_stopped(&self, err: Option<io::Error>) {
|
pub(super) fn io_stopped(&self, err: Option<io::Error>) {
|
||||||
if err.is_some() {
|
if !self.flags.get().contains(Flags::IO_STOPPED) {
|
||||||
self.error.set(err);
|
if err.is_some() {
|
||||||
|
self.error.set(err);
|
||||||
|
}
|
||||||
|
self.read_task.wake();
|
||||||
|
self.write_task.wake();
|
||||||
|
self.dispatch_task.wake();
|
||||||
|
self.notify_disconnect();
|
||||||
|
self.handle.take();
|
||||||
|
self.insert_flags(
|
||||||
|
Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
self.read_task.wake();
|
|
||||||
self.write_task.wake();
|
|
||||||
self.dispatch_task.wake();
|
|
||||||
self.notify_disconnect();
|
|
||||||
self.handle.take();
|
|
||||||
self.insert_flags(
|
|
||||||
Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gracefully shutdown read and write io tasks
|
/// Gracefully shutdown read and write io tasks
|
||||||
|
|
|
@ -537,9 +537,7 @@ impl IoContext {
|
||||||
self.0.tag(),
|
self.0.tag(),
|
||||||
nbytes
|
nbytes
|
||||||
);
|
);
|
||||||
if !inner.dispatch_task.wake_checked() {
|
inner.dispatch_task.wake();
|
||||||
log::error!("Dispatcher waker is not registered");
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
if nbytes >= hw {
|
if nbytes >= hw {
|
||||||
// read task is paused because of read back-pressure
|
// read task is paused because of read back-pressure
|
||||||
|
@ -670,7 +668,6 @@ impl IoContext {
|
||||||
// set buffer back
|
// set buffer back
|
||||||
let result = match result {
|
let result = match result {
|
||||||
Ok(0) => {
|
Ok(0) => {
|
||||||
// log::debug!("{}: WROTE ALL {:?}", self.0.tag(), inner.buffer.write_destination_size());
|
|
||||||
self.0.memory_pool().release_write_buf(buf);
|
self.0.memory_pool().release_write_buf(buf);
|
||||||
Ok(inner.buffer.write_destination_size())
|
Ok(inner.buffer.write_destination_size())
|
||||||
}
|
}
|
||||||
|
@ -680,7 +677,6 @@ impl IoContext {
|
||||||
self.0.memory_pool().release_write_buf(b);
|
self.0.memory_pool().release_write_buf(b);
|
||||||
}
|
}
|
||||||
let l = buf.len();
|
let l = buf.len();
|
||||||
// log::debug!("{}: WROTE SOME {:?}", self.0.tag(), l);
|
|
||||||
inner.buffer.set_write_destination(buf);
|
inner.buffer.set_write_destination(buf);
|
||||||
Ok(l)
|
Ok(l)
|
||||||
}
|
}
|
||||||
|
@ -739,19 +735,11 @@ impl IoContext {
|
||||||
|
|
||||||
pub fn with_read_buf<F>(&self, f: F) -> Poll<()>
|
pub fn with_read_buf<F>(&self, f: F) -> Poll<()>
|
||||||
where
|
where
|
||||||
F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
|
F: FnOnce(&mut BytesVec, usize, usize) -> Poll<io::Result<usize>>,
|
||||||
{
|
{
|
||||||
let inner = &self.0 .0;
|
let inner = &self.0 .0;
|
||||||
let (hw, lw) = self.0.memory_pool().read_params().unpack();
|
let (hw, lw) = self.0.memory_pool().read_params().unpack();
|
||||||
let result = inner.buffer.with_read_source(&self.0, |buf| {
|
let result = inner.buffer.with_read_source(&self.0, |buf| f(buf, hw, lw));
|
||||||
// make sure we've got room
|
|
||||||
let remaining = buf.remaining_mut();
|
|
||||||
if remaining < lw {
|
|
||||||
buf.reserve(hw - remaining);
|
|
||||||
}
|
|
||||||
|
|
||||||
f(buf)
|
|
||||||
});
|
|
||||||
|
|
||||||
// handle buffer changes
|
// handle buffer changes
|
||||||
match result {
|
match result {
|
||||||
|
@ -789,9 +777,7 @@ impl IoContext {
|
||||||
self.0.tag(),
|
self.0.tag(),
|
||||||
nbytes
|
nbytes
|
||||||
);
|
);
|
||||||
if !inner.dispatch_task.wake_checked() {
|
inner.dispatch_task.wake();
|
||||||
log::error!("Dispatcher waker is not registered");
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
if nbytes >= hw {
|
if nbytes >= hw {
|
||||||
// read task is paused because of read back-pressure
|
// read task is paused because of read back-pressure
|
||||||
|
|
|
@ -1,5 +1,11 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [2.5.11] - 2025-04-01
|
||||||
|
|
||||||
|
* Use edge mode for polling driver
|
||||||
|
|
||||||
|
* Use polling fork
|
||||||
|
|
||||||
## [2.5.10] - 2025-03-28
|
## [2.5.10] - 2025-03-28
|
||||||
|
|
||||||
* Better closed sockets handling
|
* Better closed sockets handling
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex-net"
|
name = "ntex-net"
|
||||||
version = "2.5.10"
|
version = "2.5.11"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "ntexwork utils for ntex framework"
|
description = "ntexwork utils for ntex framework"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
@ -27,20 +27,20 @@ compio = ["ntex-rt/compio", "ntex-compio"]
|
||||||
# neon runtime
|
# neon runtime
|
||||||
neon = ["ntex-rt/neon", "ntex-neon", "slab", "socket2"]
|
neon = ["ntex-rt/neon", "ntex-neon", "slab", "socket2"]
|
||||||
|
|
||||||
polling = ["ntex-neon/polling", "dep:polling", "socket2"]
|
|
||||||
io-uring = ["ntex-neon/io-uring", "dep:io-uring", "socket2"]
|
io-uring = ["ntex-neon/io-uring", "dep:io-uring", "socket2"]
|
||||||
|
ntex-polling = ["ntex-neon/ntex-polling", "dep:ntex-polling", "socket2"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
ntex-service = "3.3"
|
ntex-service = "3.3"
|
||||||
ntex-bytes = "0.1"
|
ntex-bytes = "0.1"
|
||||||
ntex-http = "0.1"
|
ntex-http = "0.1"
|
||||||
ntex-io = "2.11.1"
|
ntex-io = "2.11.2"
|
||||||
ntex-rt = "0.4.25"
|
ntex-rt = "0.4.25"
|
||||||
ntex-util = "2.5"
|
ntex-util = "2.5"
|
||||||
|
|
||||||
ntex-tokio = { version = "0.5.3", optional = true }
|
ntex-tokio = { version = "0.5.3", optional = true }
|
||||||
ntex-compio = { version = "0.2.4", optional = true }
|
ntex-compio = { version = "0.2.4", optional = true }
|
||||||
ntex-neon = { version = "0.1.15", optional = true }
|
ntex-neon = { version = "0.1.16", optional = true }
|
||||||
|
|
||||||
bitflags = { workspace = true }
|
bitflags = { workspace = true }
|
||||||
cfg-if = { workspace = true }
|
cfg-if = { workspace = true }
|
||||||
|
@ -53,7 +53,8 @@ thiserror = { workspace = true }
|
||||||
# Linux specific dependencies
|
# Linux specific dependencies
|
||||||
[target.'cfg(target_os = "linux")'.dependencies]
|
[target.'cfg(target_os = "linux")'.dependencies]
|
||||||
io-uring = { workspace = true, optional = true }
|
io-uring = { workspace = true, optional = true }
|
||||||
polling = { workspace = true, optional = true }
|
ntex-polling = { workspace = true, optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
ntex = "2"
|
ntex = "2"
|
||||||
|
oneshot = "0.1"
|
||||||
|
|
|
@ -14,14 +14,14 @@ cfg_if::cfg_if! {
|
||||||
mod rt_impl;
|
mod rt_impl;
|
||||||
pub use self::rt_impl::{
|
pub use self::rt_impl::{
|
||||||
from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect,
|
from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect,
|
||||||
unix_connect_in,
|
unix_connect_in, active_stream_ops
|
||||||
};
|
};
|
||||||
} else if #[cfg(all(unix, feature = "neon"))] {
|
} else if #[cfg(all(unix, feature = "neon"))] {
|
||||||
#[path = "rt_polling/mod.rs"]
|
#[path = "rt_polling/mod.rs"]
|
||||||
mod rt_impl;
|
mod rt_impl;
|
||||||
pub use self::rt_impl::{
|
pub use self::rt_impl::{
|
||||||
from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect,
|
from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect,
|
||||||
unix_connect_in,
|
unix_connect_in, active_stream_ops
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
pub use self::compat::*;
|
pub use self::compat::*;
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::os::fd::{AsRawFd, RawFd};
|
use std::os::fd::{AsRawFd, RawFd};
|
||||||
use std::{cell::RefCell, io, rc::Rc, task::Poll};
|
use std::{cell::RefCell, io, rc::Rc, task::Poll};
|
||||||
|
|
||||||
use ntex_neon::driver::{DriverApi, Event, Handler};
|
use ntex_neon::driver::{DriverApi, Event, Handler, PollMode};
|
||||||
use ntex_neon::{syscall, Runtime};
|
use ntex_neon::{syscall, Runtime};
|
||||||
use ntex_util::channel::oneshot::Sender;
|
use ntex_util::channel::oneshot::Sender;
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
|
@ -34,7 +34,7 @@ impl ConnectOps {
|
||||||
pub(crate) fn current() -> Self {
|
pub(crate) fn current() -> Self {
|
||||||
Runtime::value(|rt| {
|
Runtime::value(|rt| {
|
||||||
let mut inner = None;
|
let mut inner = None;
|
||||||
rt.driver().register(|api| {
|
rt.register_handler(|api| {
|
||||||
let ops = Rc::new(ConnectOpsInner {
|
let ops = Rc::new(ConnectOpsInner {
|
||||||
api,
|
api,
|
||||||
connects: RefCell::new(Slab::new()),
|
connects: RefCell::new(Slab::new()),
|
||||||
|
@ -62,7 +62,9 @@ impl ConnectOps {
|
||||||
let item = Item { fd, sender };
|
let item = Item { fd, sender };
|
||||||
let id = self.0.connects.borrow_mut().insert(item);
|
let id = self.0.connects.borrow_mut().insert(item);
|
||||||
|
|
||||||
self.0.api.attach(fd, id as u32, Some(Event::writable(0)));
|
self.0
|
||||||
|
.api
|
||||||
|
.attach(fd, id as u32, Event::writable(0), PollMode::Oneshot);
|
||||||
Ok(id)
|
Ok(id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,7 +74,6 @@ impl Handler for ConnectOpsBatcher {
|
||||||
log::debug!("connect-fd is readable {:?}", id);
|
log::debug!("connect-fd is readable {:?}", id);
|
||||||
|
|
||||||
let mut connects = self.inner.connects.borrow_mut();
|
let mut connects = self.inner.connects.borrow_mut();
|
||||||
|
|
||||||
if connects.contains(id) {
|
if connects.contains(id) {
|
||||||
let item = connects.remove(id);
|
let item = connects.remove(id);
|
||||||
if event.writable {
|
if event.writable {
|
||||||
|
|
|
@ -1,16 +1,16 @@
|
||||||
use std::os::fd::{AsRawFd, RawFd};
|
use std::os::fd::RawFd;
|
||||||
use std::{cell::Cell, cell::RefCell, future::Future, io, mem, rc::Rc, task, task::Poll};
|
use std::{cell::Cell, cell::RefCell, future::Future, io, rc::Rc, task::Poll};
|
||||||
|
|
||||||
use ntex_neon::driver::{DriverApi, Event, Handler};
|
use ntex_neon::driver::{DriverApi, Event, Handler, PollMode};
|
||||||
use ntex_neon::{syscall, Runtime};
|
use ntex_neon::{syscall, Runtime};
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
|
|
||||||
use ntex_bytes::BufMut;
|
use ntex_bytes::BufMut;
|
||||||
use ntex_io::IoContext;
|
use ntex_io::IoContext;
|
||||||
|
|
||||||
pub(crate) struct StreamCtl<T> {
|
pub(crate) struct StreamCtl {
|
||||||
id: u32,
|
id: u32,
|
||||||
inner: Rc<StreamOpsInner<T>>,
|
inner: Rc<StreamOpsInner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
bitflags::bitflags! {
|
bitflags::bitflags! {
|
||||||
|
@ -18,41 +18,37 @@ bitflags::bitflags! {
|
||||||
struct Flags: u8 {
|
struct Flags: u8 {
|
||||||
const RD = 0b0000_0001;
|
const RD = 0b0000_0001;
|
||||||
const WR = 0b0000_0010;
|
const WR = 0b0000_0010;
|
||||||
|
const RDSH = 0b0000_0100;
|
||||||
|
const FAILED = 0b0000_1000;
|
||||||
|
const CLOSED = 0b0001_0000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct StreamItem<T> {
|
struct StreamItem {
|
||||||
io: Option<T>,
|
|
||||||
fd: RawFd,
|
fd: RawFd,
|
||||||
flags: Flags,
|
flags: Flags,
|
||||||
ref_count: u16,
|
ref_count: u16,
|
||||||
context: IoContext,
|
context: IoContext,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct StreamOps<T>(Rc<StreamOpsInner<T>>);
|
pub(crate) struct StreamOps(Rc<StreamOpsInner>);
|
||||||
|
|
||||||
struct StreamOpsHandler<T> {
|
struct StreamOpsHandler {
|
||||||
inner: Rc<StreamOpsInner<T>>,
|
inner: Rc<StreamOpsInner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct StreamOpsInner<T> {
|
struct StreamOpsInner {
|
||||||
api: DriverApi,
|
api: DriverApi,
|
||||||
delayd_drop: Cell<bool>,
|
delayd_drop: Cell<bool>,
|
||||||
feed: RefCell<Vec<u32>>,
|
feed: RefCell<Vec<u32>>,
|
||||||
streams: Cell<Option<Box<Slab<StreamItem<T>>>>>,
|
streams: Cell<Option<Box<Slab<StreamItem>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> StreamItem<T> {
|
impl StreamOps {
|
||||||
fn tag(&self) -> &'static str {
|
|
||||||
self.context.tag()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsRawFd + 'static> StreamOps<T> {
|
|
||||||
pub(crate) fn current() -> Self {
|
pub(crate) fn current() -> Self {
|
||||||
Runtime::value(|rt| {
|
Runtime::value(|rt| {
|
||||||
let mut inner = None;
|
let mut inner = None;
|
||||||
rt.driver().register(|api| {
|
rt.register_handler(|api| {
|
||||||
let ops = Rc::new(StreamOpsInner {
|
let ops = Rc::new(StreamOpsInner {
|
||||||
api,
|
api,
|
||||||
feed: RefCell::new(Vec::new()),
|
feed: RefCell::new(Vec::new()),
|
||||||
|
@ -67,13 +63,15 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl<T> {
|
pub(crate) fn active_ops() -> usize {
|
||||||
let fd = io.as_raw_fd();
|
Self::current().0.with(|streams| streams.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn register(&self, fd: RawFd, context: IoContext) -> StreamCtl {
|
||||||
let stream = self.0.with(move |streams| {
|
let stream = self.0.with(move |streams| {
|
||||||
let item = StreamItem {
|
let item = StreamItem {
|
||||||
fd,
|
fd,
|
||||||
context,
|
context,
|
||||||
io: Some(io),
|
|
||||||
ref_count: 1,
|
ref_count: 1,
|
||||||
flags: Flags::empty(),
|
flags: Flags::empty(),
|
||||||
};
|
};
|
||||||
|
@ -86,60 +84,40 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
|
||||||
self.0.api.attach(
|
self.0.api.attach(
|
||||||
fd,
|
fd,
|
||||||
stream.id,
|
stream.id,
|
||||||
Some(Event::new(0, false, false).with_interrupt()),
|
Event::new(0, false, false).with_interrupt(),
|
||||||
|
PollMode::Oneshot,
|
||||||
);
|
);
|
||||||
stream
|
stream
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Clone for StreamOps<T> {
|
impl Clone for StreamOps {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self(self.0.clone())
|
Self(self.0.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Handler for StreamOpsHandler<T> {
|
impl Handler for StreamOpsHandler {
|
||||||
fn event(&mut self, id: usize, ev: Event) {
|
fn event(&mut self, id: usize, ev: Event) {
|
||||||
self.inner.with(|streams| {
|
self.inner.with(|streams| {
|
||||||
if !streams.contains(id) {
|
if !streams.contains(id) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let item = &mut streams[id];
|
let item = &mut streams[id];
|
||||||
if item.io.is_none() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev);
|
log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev);
|
||||||
|
|
||||||
// handle HUP
|
let mut renew = Event::new(0, false, false).with_interrupt();
|
||||||
if ev.is_interrupt() {
|
|
||||||
item.context.stopped(None);
|
|
||||||
close(id as u32, item, &self.inner.api, None, true);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut renew_ev = Event::new(0, false, false).with_interrupt();
|
|
||||||
|
|
||||||
if ev.readable {
|
if ev.readable {
|
||||||
let res = item.context.with_read_buf(|buf| {
|
let res = item.read();
|
||||||
let chunk = buf.chunk_mut();
|
|
||||||
let result = task::ready!(syscall!(
|
|
||||||
break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len())
|
|
||||||
));
|
|
||||||
if let Ok(size) = result {
|
|
||||||
log::debug!("{}: data {:?}, s: {:?}", item.tag(), item.fd, size);
|
|
||||||
unsafe { buf.advance_mut(size) };
|
|
||||||
}
|
|
||||||
Poll::Ready(result)
|
|
||||||
});
|
|
||||||
|
|
||||||
if res.is_pending() && item.context.is_read_ready() {
|
if res.is_pending() && item.context.is_read_ready() {
|
||||||
renew_ev.readable = true;
|
renew.readable = true;
|
||||||
item.flags.insert(Flags::RD);
|
item.flags.insert(Flags::RD);
|
||||||
} else {
|
} else {
|
||||||
item.flags.remove(Flags::RD);
|
item.flags.remove(Flags::RD);
|
||||||
}
|
}
|
||||||
} else if item.flags.contains(Flags::RD) {
|
} else if item.flags.contains(Flags::RD) {
|
||||||
renew_ev.readable = true;
|
renew.readable = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ev.writable {
|
if ev.writable {
|
||||||
|
@ -148,16 +126,26 @@ impl<T> Handler for StreamOpsHandler<T> {
|
||||||
syscall!(break libc::write(item.fd, buf[..].as_ptr() as _, buf.len()))
|
syscall!(break libc::write(item.fd, buf[..].as_ptr() as _, buf.len()))
|
||||||
});
|
});
|
||||||
if result.is_pending() {
|
if result.is_pending() {
|
||||||
renew_ev.writable = true;
|
renew.writable = true;
|
||||||
item.flags.insert(Flags::WR);
|
item.flags.insert(Flags::WR);
|
||||||
} else {
|
} else {
|
||||||
item.flags.remove(Flags::WR);
|
item.flags.remove(Flags::WR);
|
||||||
}
|
}
|
||||||
} else if item.flags.contains(Flags::WR) {
|
} else if item.flags.contains(Flags::WR) {
|
||||||
renew_ev.writable = true;
|
renew.writable = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.inner.api.modify(item.fd, id as u32, renew_ev);
|
// handle HUP
|
||||||
|
if ev.is_interrupt() {
|
||||||
|
item.close(id as u32, &self.inner.api, None, false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !item.flags.contains(Flags::CLOSED | Flags::FAILED) {
|
||||||
|
self.inner
|
||||||
|
.api
|
||||||
|
.modify(item.fd, id as u32, renew, PollMode::Oneshot);
|
||||||
|
}
|
||||||
|
|
||||||
// delayed drops
|
// delayed drops
|
||||||
if self.inner.delayd_drop.get() {
|
if self.inner.delayd_drop.get() {
|
||||||
|
@ -167,13 +155,12 @@ impl<T> Handler for StreamOpsHandler<T> {
|
||||||
if item.ref_count == 0 {
|
if item.ref_count == 0 {
|
||||||
let mut item = streams.remove(id as usize);
|
let mut item = streams.remove(id as usize);
|
||||||
log::debug!(
|
log::debug!(
|
||||||
"{}: Drop ({}), {:?}, has-io: {}",
|
"{}: Drop ({:?}), flags: {:?}",
|
||||||
item.tag(),
|
item.tag(),
|
||||||
id,
|
|
||||||
item.fd,
|
item.fd,
|
||||||
item.io.is_some()
|
item.flags
|
||||||
);
|
);
|
||||||
close(id, &mut item, &self.inner.api, None, true);
|
item.close(id, &self.inner.api, None, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.inner.delayd_drop.set(false);
|
self.inner.delayd_drop.set(false);
|
||||||
|
@ -191,16 +178,16 @@ impl<T> Handler for StreamOpsHandler<T> {
|
||||||
item.fd,
|
item.fd,
|
||||||
err
|
err
|
||||||
);
|
);
|
||||||
close(id as u32, item, &self.inner.api, Some(err), false);
|
item.close(id as u32, &self.inner.api, Some(err), false);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> StreamOpsInner<T> {
|
impl StreamOpsInner {
|
||||||
fn with<F, R>(&self, f: F) -> R
|
fn with<F, R>(&self, f: F) -> R
|
||||||
where
|
where
|
||||||
F: FnOnce(&mut Slab<StreamItem<T>>) -> R,
|
F: FnOnce(&mut Slab<StreamItem>) -> R,
|
||||||
{
|
{
|
||||||
let mut streams = self.streams.take().unwrap();
|
let mut streams = self.streams.take().unwrap();
|
||||||
let result = f(&mut streams);
|
let result = f(&mut streams);
|
||||||
|
@ -209,39 +196,112 @@ impl<T> StreamOpsInner<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn close<T>(
|
impl StreamItem {
|
||||||
id: u32,
|
fn tag(&self) -> &'static str {
|
||||||
item: &mut StreamItem<T>,
|
self.context.tag()
|
||||||
api: &DriverApi,
|
}
|
||||||
error: Option<io::Error>,
|
|
||||||
shutdown: bool,
|
fn read(&mut self) -> Poll<()> {
|
||||||
) -> Option<ntex_rt::JoinHandle<io::Result<i32>>> {
|
let mut flags = self.flags;
|
||||||
if let Some(io) = item.io.take() {
|
let result = self.context.with_read_buf(|buf, hw, lw| {
|
||||||
log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd);
|
// prev call result is 0
|
||||||
mem::forget(io);
|
if flags.contains(Flags::RDSH) {
|
||||||
if let Some(err) = error {
|
return Poll::Ready(Ok(0));
|
||||||
item.context.stopped(Some(err));
|
|
||||||
}
|
|
||||||
let fd = item.fd;
|
|
||||||
api.detach(fd, id);
|
|
||||||
Some(ntex_rt::spawn_blocking(move || {
|
|
||||||
if shutdown {
|
|
||||||
let _ = syscall!(libc::shutdown(fd, libc::SHUT_RDWR));
|
|
||||||
}
|
}
|
||||||
syscall!(libc::close(fd))
|
|
||||||
}))
|
let mut total = 0;
|
||||||
} else {
|
loop {
|
||||||
None
|
// make sure we've got room
|
||||||
|
let remaining = buf.remaining_mut();
|
||||||
|
if remaining < lw {
|
||||||
|
buf.reserve(hw - remaining);
|
||||||
|
}
|
||||||
|
|
||||||
|
let chunk = buf.chunk_mut();
|
||||||
|
let chunk_len = chunk.len();
|
||||||
|
let chunk_ptr = chunk.as_mut_ptr();
|
||||||
|
|
||||||
|
let result =
|
||||||
|
syscall!(break libc::read(self.fd, chunk_ptr as _, chunk.len()));
|
||||||
|
if let Poll::Ready(Ok(size)) = result {
|
||||||
|
unsafe { buf.advance_mut(size) };
|
||||||
|
total += size;
|
||||||
|
if size == chunk_len {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log::debug!(
|
||||||
|
"{}: read fd ({:?}), s: {:?}, cap: {:?}, result: {:?}",
|
||||||
|
self.tag(),
|
||||||
|
self.fd,
|
||||||
|
total,
|
||||||
|
buf.remaining_mut(),
|
||||||
|
result
|
||||||
|
);
|
||||||
|
|
||||||
|
return match result {
|
||||||
|
Poll::Ready(Err(err)) => {
|
||||||
|
flags.insert(Flags::FAILED);
|
||||||
|
if total > 0 {
|
||||||
|
self.context.stopped(Some(err));
|
||||||
|
Poll::Ready(Ok(total))
|
||||||
|
} else {
|
||||||
|
Poll::Ready(Err(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Poll::Ready(Ok(size)) => {
|
||||||
|
if size == 0 {
|
||||||
|
flags.insert(Flags::RDSH);
|
||||||
|
}
|
||||||
|
Poll::Ready(Ok(total))
|
||||||
|
}
|
||||||
|
Poll::Pending => {
|
||||||
|
if total > 0 {
|
||||||
|
Poll::Ready(Ok(total))
|
||||||
|
} else {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
self.flags = flags;
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close(
|
||||||
|
&mut self,
|
||||||
|
id: u32,
|
||||||
|
api: &DriverApi,
|
||||||
|
error: Option<io::Error>,
|
||||||
|
shutdown: bool,
|
||||||
|
) -> Option<ntex_rt::JoinHandle<io::Result<i32>>> {
|
||||||
|
if !self.flags.contains(Flags::CLOSED) {
|
||||||
|
log::debug!("{}: Closing ({}), {:?}", self.tag(), id, self.fd);
|
||||||
|
self.flags.insert(Flags::CLOSED);
|
||||||
|
self.context.stopped(error);
|
||||||
|
|
||||||
|
let fd = self.fd;
|
||||||
|
api.detach(fd, id);
|
||||||
|
Some(ntex_rt::spawn_blocking(move || {
|
||||||
|
if shutdown {
|
||||||
|
let _ = syscall!(libc::shutdown(fd, libc::SHUT_RDWR));
|
||||||
|
}
|
||||||
|
syscall!(libc::close(fd))
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> StreamCtl<T> {
|
impl StreamCtl {
|
||||||
pub(crate) fn close(self) -> impl Future<Output = io::Result<()>> {
|
pub(crate) fn close(self) -> impl Future<Output = io::Result<()>> {
|
||||||
let id = self.id as usize;
|
let id = self.id as usize;
|
||||||
let fut = self.inner.with(|streams| {
|
let fut = self
|
||||||
let item = &mut streams[id];
|
.inner
|
||||||
close(self.id, item, &self.inner.api, None, false)
|
.with(|streams| streams[id].close(self.id, &self.inner.api, None, true));
|
||||||
});
|
|
||||||
async move {
|
async move {
|
||||||
if let Some(fut) = fut {
|
if let Some(fut) = fut {
|
||||||
fut.await
|
fut.await
|
||||||
|
@ -252,55 +312,38 @@ impl<T> StreamCtl<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn with_io<F, R>(&self, f: F) -> R
|
pub(crate) fn modify(&self, rd: bool, wr: bool) -> bool {
|
||||||
where
|
|
||||||
F: FnOnce(Option<&T>) -> R,
|
|
||||||
{
|
|
||||||
self.inner
|
|
||||||
.with(|streams| f(streams[self.id as usize].io.as_ref()))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn modify(&self, rd: bool, wr: bool) {
|
|
||||||
self.inner.with(|streams| {
|
self.inner.with(|streams| {
|
||||||
let item = &mut streams[self.id as usize];
|
let item = &mut streams[self.id as usize];
|
||||||
|
if item.flags.contains(Flags::CLOSED) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
log::debug!(
|
log::debug!(
|
||||||
"{}: Modify interest ({}), {:?} rd: {:?}, wr: {:?}",
|
"{}: Modify interest ({:?}) rd: {:?}, wr: {:?}",
|
||||||
item.tag(),
|
item.tag(),
|
||||||
self.id,
|
|
||||||
item.fd,
|
item.fd,
|
||||||
rd,
|
rd,
|
||||||
wr
|
wr
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut changed = false;
|
||||||
let mut event = Event::new(0, false, false).with_interrupt();
|
let mut event = Event::new(0, false, false).with_interrupt();
|
||||||
|
|
||||||
if rd {
|
if rd {
|
||||||
if item.flags.contains(Flags::RD) {
|
if item.flags.contains(Flags::RD) {
|
||||||
event.readable = true;
|
event.readable = true;
|
||||||
} else {
|
} else {
|
||||||
let res = item.context.with_read_buf(|buf| {
|
let res = item.read();
|
||||||
let chunk = buf.chunk_mut();
|
|
||||||
let result = task::ready!(syscall!(
|
|
||||||
break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len())
|
|
||||||
));
|
|
||||||
if let Ok(size) = result {
|
|
||||||
log::debug!(
|
|
||||||
"{}: read {:?}, s: {:?}",
|
|
||||||
item.tag(),
|
|
||||||
item.fd,
|
|
||||||
size
|
|
||||||
);
|
|
||||||
unsafe { buf.advance_mut(size) };
|
|
||||||
}
|
|
||||||
Poll::Ready(result)
|
|
||||||
});
|
|
||||||
|
|
||||||
if res.is_pending() && item.context.is_read_ready() {
|
if res.is_pending() && item.context.is_read_ready() {
|
||||||
|
changed = true;
|
||||||
event.readable = true;
|
event.readable = true;
|
||||||
item.flags.insert(Flags::RD);
|
item.flags.insert(Flags::RD);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if item.flags.contains(Flags::RD) {
|
||||||
|
changed = true;
|
||||||
|
item.flags.remove(Flags::RD);
|
||||||
}
|
}
|
||||||
|
|
||||||
if wr {
|
if wr {
|
||||||
|
@ -320,18 +363,27 @@ impl<T> StreamCtl<T> {
|
||||||
});
|
});
|
||||||
|
|
||||||
if result.is_pending() {
|
if result.is_pending() {
|
||||||
|
changed = true;
|
||||||
event.writable = true;
|
event.writable = true;
|
||||||
item.flags.insert(Flags::WR);
|
item.flags.insert(Flags::WR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if item.flags.contains(Flags::WR) {
|
||||||
|
changed = true;
|
||||||
|
item.flags.remove(Flags::WR);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.inner.api.modify(item.fd, self.id, event);
|
if changed && !item.flags.contains(Flags::CLOSED | Flags::FAILED) {
|
||||||
|
self.inner
|
||||||
|
.api
|
||||||
|
.modify(item.fd, self.id, event, PollMode::Oneshot);
|
||||||
|
}
|
||||||
|
true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Clone for StreamCtl<T> {
|
impl Clone for StreamCtl {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
self.inner.with(|streams| {
|
self.inner.with(|streams| {
|
||||||
streams[self.id as usize].ref_count += 1;
|
streams[self.id as usize].ref_count += 1;
|
||||||
|
@ -343,7 +395,7 @@ impl<T> Clone for StreamCtl<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Drop for StreamCtl<T> {
|
impl Drop for StreamCtl {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Some(mut streams) = self.inner.streams.take() {
|
if let Some(mut streams) = self.inner.streams.take() {
|
||||||
let id = self.id as usize;
|
let id = self.id as usize;
|
||||||
|
@ -351,13 +403,12 @@ impl<T> Drop for StreamCtl<T> {
|
||||||
if streams[id].ref_count == 0 {
|
if streams[id].ref_count == 0 {
|
||||||
let mut item = streams.remove(id);
|
let mut item = streams.remove(id);
|
||||||
log::debug!(
|
log::debug!(
|
||||||
"{}: Drop io ({}), {:?}, has-io: {}",
|
"{}: Drop io ({:?}), flags: {:?}",
|
||||||
item.tag(),
|
item.tag(),
|
||||||
self.id,
|
|
||||||
item.fd,
|
item.fd,
|
||||||
item.io.is_some()
|
item.flags
|
||||||
);
|
);
|
||||||
close(self.id, &mut item, &self.inner.api, None, true);
|
item.close(self.id, &self.inner.api, None, true);
|
||||||
}
|
}
|
||||||
self.inner.streams.set(Some(streams));
|
self.inner.streams.set(Some(streams));
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::{any, future::poll_fn, task::Poll};
|
use std::{any, future::poll_fn, mem, os::fd::AsRawFd, task::Poll};
|
||||||
|
|
||||||
use ntex_io::{
|
use ntex_io::{
|
||||||
types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
|
types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
|
||||||
|
@ -12,11 +12,10 @@ impl IoStream for super::TcpStream {
|
||||||
fn start(self, read: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>> {
|
fn start(self, read: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>> {
|
||||||
let io = self.0;
|
let io = self.0;
|
||||||
let context = read.context();
|
let context = read.context();
|
||||||
let ctl = StreamOps::current().register(io, context.clone());
|
let ctl = StreamOps::current().register(io.as_raw_fd(), context.clone());
|
||||||
let ctl2 = ctl.clone();
|
|
||||||
spawn(async move { run(ctl, context).await });
|
spawn(async move { run(ctl, context).await });
|
||||||
|
|
||||||
Some(Box::new(HandleWrapper(ctl2)))
|
Some(Box::new(HandleWrapper(Some(io))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,19 +23,20 @@ impl IoStream for super::UnixStream {
|
||||||
fn start(self, read: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>> {
|
fn start(self, read: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>> {
|
||||||
let io = self.0;
|
let io = self.0;
|
||||||
let context = read.context();
|
let context = read.context();
|
||||||
let ctl = StreamOps::current().register(io, context.clone());
|
let ctl = StreamOps::current().register(io.as_raw_fd(), context.clone());
|
||||||
spawn(async move { run(ctl, context).await });
|
spawn(async move { run(ctl, context).await });
|
||||||
|
mem::forget(io);
|
||||||
|
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct HandleWrapper(StreamCtl<Socket>);
|
struct HandleWrapper(Option<Socket>);
|
||||||
|
|
||||||
impl Handle for HandleWrapper {
|
impl Handle for HandleWrapper {
|
||||||
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
|
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
|
||||||
if id == any::TypeId::of::<types::PeerAddr>() {
|
if id == any::TypeId::of::<types::PeerAddr>() {
|
||||||
let addr = self.0.with_io(|io| io.and_then(|io| io.peer_addr().ok()));
|
let addr = self.0.as_ref().unwrap().peer_addr().ok();
|
||||||
if let Some(addr) = addr.and_then(|addr| addr.as_socket()) {
|
if let Some(addr) = addr.and_then(|addr| addr.as_socket()) {
|
||||||
return Some(Box::new(types::PeerAddr(addr)));
|
return Some(Box::new(types::PeerAddr(addr)));
|
||||||
}
|
}
|
||||||
|
@ -45,13 +45,19 @@ impl Handle for HandleWrapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for HandleWrapper {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
mem::forget(self.0.take());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||||
enum Status {
|
enum Status {
|
||||||
Shutdown,
|
Shutdown,
|
||||||
Terminate,
|
Terminate,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
|
async fn run(ctl: StreamCtl, context: IoContext) {
|
||||||
// Handle io read readiness
|
// Handle io read readiness
|
||||||
let st = poll_fn(|cx| {
|
let st = poll_fn(|cx| {
|
||||||
let mut modify = false;
|
let mut modify = false;
|
||||||
|
@ -82,7 +88,9 @@ async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
|
||||||
};
|
};
|
||||||
|
|
||||||
if modify {
|
if modify {
|
||||||
ctl.modify(readable, writable);
|
if !ctl.modify(readable, writable) {
|
||||||
|
return Poll::Ready(Status::Terminate);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if read.is_pending() && write.is_pending() {
|
if read.is_pending() && write.is_pending() {
|
||||||
|
@ -95,7 +103,10 @@ async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
ctl.modify(false, true);
|
if st != Status::Terminate {
|
||||||
context.shutdown(st == Status::Shutdown).await;
|
if ctl.modify(false, true) {
|
||||||
|
context.shutdown(st == Status::Shutdown).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
context.stopped(ctl.close().await.err());
|
context.stopped(ctl.close().await.err());
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,3 +67,68 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
|
||||||
Socket::from(stream),
|
Socket::from(stream),
|
||||||
)?)))
|
)?)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
/// Get number of active Io objects
|
||||||
|
pub fn active_stream_ops() -> usize {
|
||||||
|
self::driver::StreamOps::active_ops()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(all(target_os = "linux", feature = "neon"))]
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use ntex::{io::Io, time::sleep, time::Millis, util::PoolId};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use crate::connect::Connect;
|
||||||
|
|
||||||
|
const DATA: &[u8] = b"Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
|
Hello World Hello World Hello World Hello World Hello World";
|
||||||
|
|
||||||
|
#[ntex::test]
|
||||||
|
async fn idle_disconnect() {
|
||||||
|
PoolId::P5.set_read_params(24, 12);
|
||||||
|
let (tx, rx) = ::oneshot::channel();
|
||||||
|
let tx = Arc::new(Mutex::new(Some(tx)));
|
||||||
|
|
||||||
|
let server = ntex::server::test_server(move || {
|
||||||
|
let tx = tx.clone();
|
||||||
|
ntex_service::fn_service(move |io: Io<_>| {
|
||||||
|
tx.lock().unwrap().take().unwrap().send(()).unwrap();
|
||||||
|
|
||||||
|
async move {
|
||||||
|
io.write(DATA).unwrap();
|
||||||
|
sleep(Millis(250)).await;
|
||||||
|
io.close();
|
||||||
|
Ok::<_, ()>(())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let msg = Connect::new(server.addr());
|
||||||
|
let io = crate::connect::connect(msg).await.unwrap();
|
||||||
|
io.set_memory_pool(PoolId::P5.into());
|
||||||
|
rx.await.unwrap();
|
||||||
|
|
||||||
|
io.on_disconnect().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ impl ConnectOps {
|
||||||
pub(crate) fn current() -> Self {
|
pub(crate) fn current() -> Self {
|
||||||
Runtime::value(|rt| {
|
Runtime::value(|rt| {
|
||||||
let mut inner = None;
|
let mut inner = None;
|
||||||
rt.driver().register(|api| {
|
rt.register_handler(|api| {
|
||||||
if !api.is_supported(opcode::Connect::CODE) {
|
if !api.is_supported(opcode::Connect::CODE) {
|
||||||
panic!("opcode::Connect is required for io-uring support");
|
panic!("opcode::Connect is required for io-uring support");
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,7 +77,7 @@ impl<T: os::fd::AsRawFd + 'static> StreamOps<T> {
|
||||||
pub(crate) fn current() -> Self {
|
pub(crate) fn current() -> Self {
|
||||||
Runtime::value(|rt| {
|
Runtime::value(|rt| {
|
||||||
let mut inner = None;
|
let mut inner = None;
|
||||||
rt.driver().register(|api| {
|
rt.register_handler(|api| {
|
||||||
if !api.is_supported(opcode::Recv::CODE) {
|
if !api.is_supported(opcode::Recv::CODE) {
|
||||||
panic!("opcode::Recv is required for io-uring support");
|
panic!("opcode::Recv is required for io-uring support");
|
||||||
}
|
}
|
||||||
|
@ -124,6 +124,10 @@ impl<T: os::fd::AsRawFd + 'static> StreamOps<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn active_ops() -> usize {
|
||||||
|
Self::current().with(|st| st.streams.len())
|
||||||
|
}
|
||||||
|
|
||||||
fn with<F, R>(&self, f: F) -> R
|
fn with<F, R>(&self, f: F) -> R
|
||||||
where
|
where
|
||||||
F: FnOnce(&mut StreamOpsStorage<T>) -> R,
|
F: FnOnce(&mut StreamOpsStorage<T>) -> R,
|
||||||
|
|
|
@ -64,3 +64,9 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
|
||||||
Socket::from(stream),
|
Socket::from(stream),
|
||||||
)?)))
|
)?)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
/// Get number of active Io objects
|
||||||
|
pub fn active_stream_ops() -> usize {
|
||||||
|
self::driver::StreamOps::<socket2::Socket>::active_ops()
|
||||||
|
}
|
||||||
|
|
|
@ -42,4 +42,4 @@ tok-io = { version = "1", package = "tokio", default-features = false, features
|
||||||
"net",
|
"net",
|
||||||
], optional = true }
|
], optional = true }
|
||||||
|
|
||||||
ntex-neon = { version = "0.1.14", optional = true }
|
ntex-neon = { version = "0.1.16", optional = true }
|
||||||
|
|
|
@ -265,7 +265,7 @@ mod neon {
|
||||||
let rt = Runtime::new().unwrap();
|
let rt = Runtime::new().unwrap();
|
||||||
log::info!(
|
log::info!(
|
||||||
"Starting neon runtime, driver {:?}",
|
"Starting neon runtime, driver {:?}",
|
||||||
rt.driver().tp().name()
|
rt.driver_type().name()
|
||||||
);
|
);
|
||||||
|
|
||||||
rt.block_on(fut);
|
rt.block_on(fut);
|
||||||
|
|
|
@ -252,8 +252,6 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
thread::sleep(std::time::Duration::from_millis(150));
|
|
||||||
|
|
||||||
let (system, server, addr) = rx.recv().unwrap();
|
let (system, server, addr) = rx.recv().unwrap();
|
||||||
|
|
||||||
TestServer {
|
TestServer {
|
||||||
|
|
|
@ -1,9 +1,8 @@
|
||||||
use std::{io, rc::Rc};
|
use std::{io, rc::Rc};
|
||||||
|
|
||||||
use ntex::codec::BytesCodec;
|
|
||||||
use ntex::connect::Connect;
|
|
||||||
use ntex::io::{types::PeerAddr, Io};
|
use ntex::io::{types::PeerAddr, Io};
|
||||||
use ntex::service::{chain_factory, fn_service, Pipeline, ServiceFactory};
|
use ntex::service::{chain_factory, fn_service, Pipeline, ServiceFactory};
|
||||||
|
use ntex::{codec::BytesCodec, connect::Connect};
|
||||||
use ntex::{server::build_test_server, server::test_server, time, util::Bytes};
|
use ntex::{server::build_test_server, server::test_server, time, util::Bytes};
|
||||||
|
|
||||||
#[cfg(feature = "rustls")]
|
#[cfg(feature = "rustls")]
|
||||||
|
|
|
@ -682,15 +682,18 @@ async fn client_read_until_eof() {
|
||||||
for stream in lst.incoming() {
|
for stream in lst.incoming() {
|
||||||
if let Ok(mut stream) = stream {
|
if let Ok(mut stream) = stream {
|
||||||
let mut b = [0; 1000];
|
let mut b = [0; 1000];
|
||||||
let _ = stream.read(&mut b).unwrap();
|
log::debug!("Reading request");
|
||||||
let _ = stream
|
let res = stream.read(&mut b).unwrap();
|
||||||
|
log::debug!("Read {:?}", res);
|
||||||
|
let res = stream
|
||||||
.write_all(b"HTTP/1.0 200 OK\r\nconnection: close\r\n\r\nwelcome!");
|
.write_all(b"HTTP/1.0 200 OK\r\nconnection: close\r\n\r\nwelcome!");
|
||||||
|
log::debug!("Sent {:?}", res);
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
sleep(Millis(300)).await;
|
sleep(Millis(500)).await;
|
||||||
|
|
||||||
// client request
|
// client request
|
||||||
let req = Client::build()
|
let req = Client::build()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue