Refactor driver

This commit is contained in:
Nikolay Kim 2025-04-01 17:10:25 +05:00
parent 315cf77668
commit 1fefbf2e6f
8 changed files with 213 additions and 246 deletions

View file

@ -98,17 +98,19 @@ impl IoState {
} }
pub(super) fn io_stopped(&self, err: Option<io::Error>) { pub(super) fn io_stopped(&self, err: Option<io::Error>) {
if err.is_some() { if !self.flags.get().contains(Flags::IO_STOPPED) {
self.error.set(err); if err.is_some() {
self.error.set(err);
}
self.read_task.wake();
self.write_task.wake();
self.dispatch_task.wake();
self.notify_disconnect();
self.handle.take();
self.insert_flags(
Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS,
);
} }
self.read_task.wake();
self.write_task.wake();
self.dispatch_task.wake();
self.notify_disconnect();
self.handle.take();
self.insert_flags(
Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS,
);
} }
/// Gracefully shutdown read and write io tasks /// Gracefully shutdown read and write io tasks

View file

@ -537,9 +537,7 @@ impl IoContext {
self.0.tag(), self.0.tag(),
nbytes nbytes
); );
if !inner.dispatch_task.wake_checked() { inner.dispatch_task.wake();
log::error!("Dispatcher waker is not registered");
}
} else { } else {
if nbytes >= hw { if nbytes >= hw {
// read task is paused because of read back-pressure // read task is paused because of read back-pressure
@ -779,11 +777,7 @@ impl IoContext {
self.0.tag(), self.0.tag(),
nbytes nbytes
); );
if !inner.dispatch_task.wake_checked() { inner.dispatch_task.wake();
log::error!(
"{}: Dispatcher waker is not registered, bytes: {:?}, flags: {:?}",
self.0.tag(), status.nbytes, self.flags());
}
} else { } else {
if nbytes >= hw { if nbytes >= hw {
// read task is paused because of read back-pressure // read task is paused because of read back-pressure

View file

@ -14,7 +14,7 @@ cfg_if::cfg_if! {
mod rt_impl; mod rt_impl;
pub use self::rt_impl::{ pub use self::rt_impl::{
from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect, from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect,
unix_connect_in, unix_connect_in, active_stream_ops
}; };
} else if #[cfg(all(unix, feature = "neon"))] { } else if #[cfg(all(unix, feature = "neon"))] {
#[path = "rt_polling/mod.rs"] #[path = "rt_polling/mod.rs"]

View file

@ -1,72 +1,50 @@
use std::os::fd::{AsRawFd, RawFd}; use std::os::fd::RawFd;
use std::{cell::Cell, cell::RefCell, future::Future, io, mem, rc::Rc, task::Poll}; use std::{cell::Cell, cell::RefCell, future::Future, io, rc::Rc, task::Poll};
use ntex_neon::driver::{DriverApi, Event, Handler, PollMode}; use ntex_neon::driver::{DriverApi, Event, Handler, PollMode};
use ntex_neon::{syscall, Runtime}; use ntex_neon::{syscall, Runtime};
use slab::Slab; use slab::Slab;
use ntex_bytes::{BufMut, BytesVec}; use ntex_bytes::BufMut;
use ntex_io::IoContext; use ntex_io::IoContext;
pub(crate) struct StreamCtl<T> { pub(crate) struct StreamCtl {
id: u32, id: u32,
inner: Rc<StreamOpsInner<T>>, inner: Rc<StreamOpsInner>,
} }
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
struct Flags: u8 { struct Flags: u8 {
const RD = 0b0000_0001; const RD = 0b0000_0001;
const WR = 0b0000_0010; const WR = 0b0000_0010;
const ERR = 0b0000_0100; const RDSH = 0b0000_0100;
const RDSH = 0b0000_1000; const FAILED = 0b0000_1000;
const CLOSED = 0b0001_0000;
} }
} }
struct StreamItem<T> { struct StreamItem {
io: Option<T>,
fd: RawFd, fd: RawFd,
flags: Cell<Flags>, flags: Flags,
ref_count: u16, ref_count: u16,
context: IoContext, context: IoContext,
} }
pub(crate) struct StreamOps<T>(Rc<StreamOpsInner<T>>); pub(crate) struct StreamOps(Rc<StreamOpsInner>);
struct StreamOpsHandler<T> { struct StreamOpsHandler {
inner: Rc<StreamOpsInner<T>>, inner: Rc<StreamOpsInner>,
} }
struct StreamOpsInner<T> { struct StreamOpsInner {
api: DriverApi, api: DriverApi,
delayd_drop: Cell<bool>, delayd_drop: Cell<bool>,
feed: RefCell<Vec<u32>>, feed: RefCell<Vec<u32>>,
streams: Cell<Option<Box<Slab<StreamItem<T>>>>>, streams: Cell<Option<Box<Slab<StreamItem>>>>,
} }
impl<T> StreamItem<T> { impl StreamOps {
fn tag(&self) -> &'static str {
self.context.tag()
}
fn contains(&self, flag: Flags) -> bool {
self.flags.get().contains(flag)
}
fn insert(&self, fl: Flags) {
let mut flags = self.flags.get();
flags.insert(fl);
self.flags.set(flags);
}
fn remove(&self, fl: Flags) {
let mut flags = self.flags.get();
flags.remove(fl);
self.flags.set(flags);
}
}
impl<T: AsRawFd + 'static> StreamOps<T> {
pub(crate) fn current() -> Self { pub(crate) fn current() -> Self {
Runtime::value(|rt| { Runtime::value(|rt| {
let mut inner = None; let mut inner = None;
@ -89,15 +67,13 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
Self::current().0.with(|streams| streams.len()) Self::current().0.with(|streams| streams.len())
} }
pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl<T> { pub(crate) fn register(&self, fd: RawFd, context: IoContext) -> StreamCtl {
let fd = io.as_raw_fd();
let stream = self.0.with(move |streams| { let stream = self.0.with(move |streams| {
let item = StreamItem { let item = StreamItem {
fd, fd,
context, context,
io: Some(io),
ref_count: 1, ref_count: 1,
flags: Cell::new(Flags::empty()), flags: Flags::empty(),
}; };
StreamCtl { StreamCtl {
id: streams.insert(item) as u32, id: streams.insert(item) as u32,
@ -115,72 +91,61 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
} }
} }
impl<T> Clone for StreamOps<T> { impl Clone for StreamOps {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self(self.0.clone()) Self(self.0.clone())
} }
} }
impl<T> Handler for StreamOpsHandler<T> { impl Handler for StreamOpsHandler {
fn event(&mut self, id: usize, ev: Event) { fn event(&mut self, id: usize, ev: Event) {
self.inner.with(|streams| { self.inner.with(|streams| {
if !streams.contains(id) { if !streams.contains(id) {
return; return;
} }
let item = &mut streams[id]; let item = &mut streams[id];
if item.io.is_none() || item.contains(Flags::ERR) {
item.context.stopped(None);
return;
}
log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev);
let mut renew_ev = Event::new(0, false, false).with_interrupt(); let mut renew = Event::new(0, false, false).with_interrupt();
// handle read op
if ev.readable { if ev.readable {
let res = item let res = item.read();
.context
.with_read_buf(|buf, hw, lw| read(item, buf, hw, lw));
if res.is_pending() && item.context.is_read_ready() { if res.is_pending() && item.context.is_read_ready() {
renew_ev.readable = true; renew.readable = true;
item.flags.insert(Flags::RD);
} else { } else {
item.remove(Flags::RD); item.flags.remove(Flags::RD);
} }
} else if item.contains(Flags::RD) { } else if item.flags.contains(Flags::RD) {
renew_ev.readable = true; renew.readable = true;
} }
// handle HUP
if ev.is_interrupt() && !item.contains(Flags::ERR) {
item.context.stopped(None);
close(id as u32, item, &self.inner.api);
return;
}
// handle error
if ev.is_err() == Some(true) || ev.is_interrupt() {
item.insert(Flags::ERR);
}
// handle write op
if ev.writable { if ev.writable {
let result = item.context.with_write_buf(|buf| { let result = item.context.with_write_buf(|buf| {
log::debug!("{}: write {:?} s: {:?}", item.tag(), item.fd, buf.len()); log::debug!("{}: write {:?} s: {:?}", item.tag(), item.fd, buf.len());
syscall!(break libc::write(item.fd, buf[..].as_ptr() as _, buf.len())) syscall!(break libc::write(item.fd, buf[..].as_ptr() as _, buf.len()))
}); });
if result.is_pending() { if result.is_pending() {
renew_ev.writable = true; renew.writable = true;
item.flags.insert(Flags::WR);
} else { } else {
item.remove(Flags::WR); item.flags.remove(Flags::WR);
} }
} else if item.contains(Flags::WR) { } else if item.flags.contains(Flags::WR) {
renew_ev.writable = true; renew.writable = true;
} }
self.inner // handle HUP
.api if ev.is_interrupt() {
.modify(item.fd, id as u32, renew_ev, PollMode::Oneshot); item.close(id as u32, &self.inner.api, None, false);
return;
}
if !item.flags.contains(Flags::CLOSED | Flags::FAILED) {
self.inner
.api
.modify(item.fd, id as u32, renew, PollMode::Oneshot);
}
// delayed drops // delayed drops
if self.inner.delayd_drop.get() { if self.inner.delayd_drop.get() {
@ -190,14 +155,12 @@ impl<T> Handler for StreamOpsHandler<T> {
if item.ref_count == 0 { if item.ref_count == 0 {
let mut item = streams.remove(id as usize); let mut item = streams.remove(id as usize);
log::debug!( log::debug!(
"{}: Drop ({}), {:?}, has-io: {}", "{}: Drop ({:?}), flags: {:?}",
item.tag(), item.tag(),
id,
item.fd, item.fd,
item.io.is_some() item.flags
); );
item.context.stopped(None); item.close(id, &self.inner.api, None, true);
close(id, &mut item, &self.inner.api);
} }
} }
self.inner.delayd_drop.set(false); self.inner.delayd_drop.set(false);
@ -215,18 +178,16 @@ impl<T> Handler for StreamOpsHandler<T> {
item.fd, item.fd,
err err
); );
item.insert(Flags::ERR); item.close(id as u32, &self.inner.api, Some(err), false);
item.context.stopped(Some(err));
close(id as u32, item, &self.inner.api);
} }
}) })
} }
} }
impl<T> StreamOpsInner<T> { impl StreamOpsInner {
fn with<F, R>(&self, f: F) -> R fn with<F, R>(&self, f: F) -> R
where where
F: FnOnce(&mut Slab<StreamItem<T>>) -> R, F: FnOnce(&mut Slab<StreamItem>) -> R,
{ {
let mut streams = self.streams.take().unwrap(); let mut streams = self.streams.take().unwrap();
let result = f(&mut streams); let result = f(&mut streams);
@ -235,110 +196,112 @@ impl<T> StreamOpsInner<T> {
} }
} }
fn read<T>( impl StreamItem {
item: &StreamItem<T>, fn tag(&self) -> &'static str {
buf: &mut BytesVec, self.context.tag()
hw: usize,
lw: usize,
) -> Poll<io::Result<usize>> {
log::debug!(
"{}: reading fd ({:?}) flags: {:?}",
item.tag(),
item.fd,
item.context.flags()
);
if item.contains(Flags::RDSH) {
return Poll::Ready(Ok(0));
} }
let mut total = 0; fn read(&mut self) -> Poll<()> {
loop { let mut flags = self.flags;
// make sure we've got room let result = self.context.with_read_buf(|buf, hw, lw| {
let remaining = buf.remaining_mut(); // prev call result is 0
if remaining < lw { if flags.contains(Flags::RDSH) {
buf.reserve(hw - remaining); return Poll::Ready(Ok(0));
}
let mut total = 0;
loop {
// make sure we've got room
let remaining = buf.remaining_mut();
if remaining < lw {
buf.reserve(hw - remaining);
}
let chunk = buf.chunk_mut();
let chunk_len = chunk.len();
let chunk_ptr = chunk.as_mut_ptr();
let result =
syscall!(break libc::read(self.fd, chunk_ptr as _, chunk.len()));
if let Poll::Ready(Ok(size)) = result {
unsafe { buf.advance_mut(size) };
total += size;
if size == chunk_len {
continue;
}
}
log::debug!(
"{}: read fd ({:?}), s: {:?}, cap: {:?}, result: {:?}",
self.tag(),
self.fd,
total,
buf.remaining_mut(),
result
);
return match result {
Poll::Ready(Err(err)) => {
flags.insert(Flags::FAILED);
if total > 0 {
self.context.stopped(Some(err));
Poll::Ready(Ok(total))
} else {
Poll::Ready(Err(err))
}
}
Poll::Ready(Ok(size)) => {
if size == 0 {
flags.insert(Flags::RDSH);
}
Poll::Ready(Ok(total))
}
Poll::Pending => {
if total > 0 {
Poll::Ready(Ok(total))
} else {
Poll::Pending
}
}
};
}
});
self.flags = flags;
result
}
fn close(
&mut self,
id: u32,
api: &DriverApi,
error: Option<io::Error>,
shutdown: bool,
) -> Option<ntex_rt::JoinHandle<io::Result<i32>>> {
if !self.flags.contains(Flags::CLOSED) {
log::debug!("{}: Closing ({}), {:?}", self.tag(), id, self.fd);
self.flags.insert(Flags::CLOSED);
self.context.stopped(error);
let fd = self.fd;
api.detach(fd, id);
Some(ntex_rt::spawn_blocking(move || {
if shutdown {
let _ = syscall!(libc::shutdown(fd, libc::SHUT_RDWR));
}
syscall!(libc::close(fd))
}))
} else {
None
} }
let chunk = buf.chunk_mut();
let chunk_len = chunk.len();
let chunk_ptr = chunk.as_mut_ptr();
let result = syscall!(break libc::read(item.fd, chunk_ptr as _, chunk.len()));
if let Poll::Ready(Ok(size)) = result {
unsafe { buf.advance_mut(size) };
total += size;
if size == chunk_len {
continue;
}
}
log::debug!(
"{}: read fd ({:?}), s: {:?}, cap: {:?}, result: {:?}",
item.tag(),
item.fd,
total,
buf.remaining_mut(),
result
);
return match result {
Poll::Ready(Err(err)) => {
item.insert(Flags::ERR);
if total > 0 {
item.context.stopped(Some(err));
Poll::Ready(Ok(total))
} else {
Poll::Ready(Err(err))
}
}
Poll::Ready(Ok(size)) => {
if size == 0 {
item.insert(Flags::RDSH);
item.context.stopped(None);
}
Poll::Ready(Ok(total))
}
Poll::Pending => {
if total > 0 {
Poll::Ready(Ok(total))
} else {
Poll::Pending
}
}
};
} }
} }
fn close<T>( impl StreamCtl {
id: u32,
item: &mut StreamItem<T>,
api: &DriverApi,
) -> Option<ntex_rt::JoinHandle<io::Result<i32>>> {
if let Some(io) = item.io.take() {
log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd);
mem::forget(io);
let fd = item.fd;
let shutdown = !item.flags.get().intersects(Flags::ERR | Flags::RDSH);
api.detach(fd, id);
Some(ntex_rt::spawn_blocking(move || {
if shutdown {
let _ = syscall!(libc::shutdown(fd, libc::SHUT_RDWR));
}
syscall!(libc::close(fd))
}))
} else {
None
}
}
impl<T> StreamCtl<T> {
pub(crate) fn close(self) -> impl Future<Output = io::Result<()>> { pub(crate) fn close(self) -> impl Future<Output = io::Result<()>> {
let id = self.id as usize; let id = self.id as usize;
let fut = self.inner.with(|streams| { let fut = self
let item = &mut streams[id]; .inner
item.context.stopped(None); .with(|streams| streams[id].close(self.id, &self.inner.api, None, true));
close(self.id, item, &self.inner.api)
});
async move { async move {
if let Some(fut) = fut { if let Some(fut) = fut {
fut.await fut.await
@ -349,52 +312,42 @@ impl<T> StreamCtl<T> {
} }
} }
pub(crate) fn with_io<F, R>(&self, f: F) -> R
where
F: FnOnce(Option<&T>) -> R,
{
self.inner
.with(|streams| f(streams[self.id as usize].io.as_ref()))
}
pub(crate) fn modify(&self, rd: bool, wr: bool) -> bool { pub(crate) fn modify(&self, rd: bool, wr: bool) -> bool {
self.inner.with(|streams| { self.inner.with(|streams| {
let item = &mut streams[self.id as usize]; let item = &mut streams[self.id as usize];
if item.io.is_none() || item.contains(Flags::ERR) { if item.flags.contains(Flags::CLOSED) {
return false; return false;
} }
log::debug!( log::debug!(
"{}: Modify interest ({}), {:?} rd: {:?}, wr: {:?}, flags: {:?}", "{}: Modify interest ({:?}) rd: {:?}, wr: {:?}",
item.tag(), item.tag(),
self.id,
item.fd, item.fd,
rd, rd,
wr, wr
item.flags
); );
let mut changed = false; let mut changed = false;
let mut event = Event::new(0, false, false).with_interrupt(); let mut event = Event::new(0, false, false).with_interrupt();
if rd { if rd {
if item.contains(Flags::RD) { if item.flags.contains(Flags::RD) {
event.readable = true; event.readable = true;
} else { } else {
let res = item let res = item.read();
.context
.with_read_buf(|buf, hw, lw| read(item, buf, hw, lw));
if res.is_pending() && item.context.is_read_ready() { if res.is_pending() && item.context.is_read_ready() {
changed = true; changed = true;
event.readable = true; event.readable = true;
item.insert(Flags::RD); item.flags.insert(Flags::RD);
} }
} }
} else if item.flags.contains(Flags::RD) {
changed = true;
item.flags.remove(Flags::RD);
} }
if wr { if wr {
if item.contains(Flags::WR) { if item.flags.contains(Flags::WR) {
event.writable = true; event.writable = true;
} else { } else {
let result = item.context.with_write_buf(|buf| { let result = item.context.with_write_buf(|buf| {
@ -412,12 +365,15 @@ impl<T> StreamCtl<T> {
if result.is_pending() { if result.is_pending() {
changed = true; changed = true;
event.writable = true; event.writable = true;
item.insert(Flags::WR); item.flags.insert(Flags::WR);
} }
} }
} else if item.flags.contains(Flags::WR) {
changed = true;
item.flags.remove(Flags::WR);
} }
if changed { if changed && !item.flags.contains(Flags::CLOSED | Flags::FAILED) {
self.inner self.inner
.api .api
.modify(item.fd, self.id, event, PollMode::Oneshot); .modify(item.fd, self.id, event, PollMode::Oneshot);
@ -427,7 +383,7 @@ impl<T> StreamCtl<T> {
} }
} }
impl<T> Clone for StreamCtl<T> { impl Clone for StreamCtl {
fn clone(&self) -> Self { fn clone(&self) -> Self {
self.inner.with(|streams| { self.inner.with(|streams| {
streams[self.id as usize].ref_count += 1; streams[self.id as usize].ref_count += 1;
@ -439,7 +395,7 @@ impl<T> Clone for StreamCtl<T> {
} }
} }
impl<T> Drop for StreamCtl<T> { impl Drop for StreamCtl {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(mut streams) = self.inner.streams.take() { if let Some(mut streams) = self.inner.streams.take() {
let id = self.id as usize; let id = self.id as usize;
@ -447,14 +403,12 @@ impl<T> Drop for StreamCtl<T> {
if streams[id].ref_count == 0 { if streams[id].ref_count == 0 {
let mut item = streams.remove(id); let mut item = streams.remove(id);
log::debug!( log::debug!(
"{}: Drop io ({}), {:?}, has-io: {}", "{}: Drop io ({:?}), flags: {:?}",
item.tag(), item.tag(),
self.id,
item.fd, item.fd,
item.io.is_some() item.flags
); );
item.context.stopped(None); item.close(self.id, &self.inner.api, None, true);
close(self.id, &mut item, &self.inner.api);
} }
self.inner.streams.set(Some(streams)); self.inner.streams.set(Some(streams));
} else { } else {

View file

@ -1,4 +1,4 @@
use std::{any, future::poll_fn, task::Poll}; use std::{any, future::poll_fn, mem, os::fd::AsRawFd, task::Poll};
use ntex_io::{ use ntex_io::{
types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
@ -12,11 +12,10 @@ impl IoStream for super::TcpStream {
fn start(self, read: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>> { fn start(self, read: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>> {
let io = self.0; let io = self.0;
let context = read.context(); let context = read.context();
let ctl = StreamOps::current().register(io, context.clone()); let ctl = StreamOps::current().register(io.as_raw_fd(), context.clone());
let ctl2 = ctl.clone();
spawn(async move { run(ctl, context).await }); spawn(async move { run(ctl, context).await });
Some(Box::new(HandleWrapper(ctl2))) Some(Box::new(HandleWrapper(Some(io))))
} }
} }
@ -24,19 +23,20 @@ impl IoStream for super::UnixStream {
fn start(self, read: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>> { fn start(self, read: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>> {
let io = self.0; let io = self.0;
let context = read.context(); let context = read.context();
let ctl = StreamOps::current().register(io, context.clone()); let ctl = StreamOps::current().register(io.as_raw_fd(), context.clone());
spawn(async move { run(ctl, context).await }); spawn(async move { run(ctl, context).await });
mem::forget(io);
None None
} }
} }
struct HandleWrapper(StreamCtl<Socket>); struct HandleWrapper(Option<Socket>);
impl Handle for HandleWrapper { impl Handle for HandleWrapper {
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> { fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
if id == any::TypeId::of::<types::PeerAddr>() { if id == any::TypeId::of::<types::PeerAddr>() {
let addr = self.0.with_io(|io| io.and_then(|io| io.peer_addr().ok())); let addr = self.0.as_ref().unwrap().peer_addr().ok();
if let Some(addr) = addr.and_then(|addr| addr.as_socket()) { if let Some(addr) = addr.and_then(|addr| addr.as_socket()) {
return Some(Box::new(types::PeerAddr(addr))); return Some(Box::new(types::PeerAddr(addr)));
} }
@ -45,13 +45,19 @@ impl Handle for HandleWrapper {
} }
} }
impl Drop for HandleWrapper {
fn drop(&mut self) {
mem::forget(self.0.take());
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)] #[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum Status { enum Status {
Shutdown, Shutdown,
Terminate, Terminate,
} }
async fn run<T>(ctl: StreamCtl<T>, context: IoContext) { async fn run(ctl: StreamCtl, context: IoContext) {
// Handle io read readiness // Handle io read readiness
let st = poll_fn(|cx| { let st = poll_fn(|cx| {
let mut modify = false; let mut modify = false;
@ -98,8 +104,9 @@ async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
.await; .await;
if st != Status::Terminate { if st != Status::Terminate {
ctl.modify(false, true); if ctl.modify(false, true) {
context.shutdown(st == Status::Shutdown).await; context.shutdown(st == Status::Shutdown).await;
}
} }
context.stopped(ctl.close().await.err()); context.stopped(ctl.close().await.err());
} }

View file

@ -71,7 +71,7 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
#[doc(hidden)] #[doc(hidden)]
/// Get number of active Io objects /// Get number of active Io objects
pub fn active_stream_ops() -> usize { pub fn active_stream_ops() -> usize {
self::driver::StreamOps::<socket2::Socket>::active_ops() self::driver::StreamOps::active_ops()
} }
#[cfg(all(target_os = "linux", feature = "neon"))] #[cfg(all(target_os = "linux", feature = "neon"))]

View file

@ -124,6 +124,10 @@ impl<T: os::fd::AsRawFd + 'static> StreamOps<T> {
} }
} }
pub(crate) fn active_ops() -> usize {
Self::current().with(|st| st.streams.len())
}
fn with<F, R>(&self, f: F) -> R fn with<F, R>(&self, f: F) -> R
where where
F: FnOnce(&mut StreamOpsStorage<T>) -> R, F: FnOnce(&mut StreamOpsStorage<T>) -> R,

View file

@ -64,3 +64,9 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
Socket::from(stream), Socket::from(stream),
)?))) )?)))
} }
#[doc(hidden)]
/// Get number of active Io objects
pub fn active_stream_ops() -> usize {
self::driver::StreamOps::<socket2::Socket>::active_ops()
}