uring driver impl

This commit is contained in:
Nikolay Kim 2025-03-11 14:09:07 +05:00
parent 4833f8dc17
commit a9c0348b98
24 changed files with 750 additions and 420 deletions

View file

@ -35,6 +35,9 @@ jobs:
- name: Code coverage (neon)
run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/neon,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
- name: Code coverage (neon-uring)
run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/neon-uring,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
- name: Generate coverage report
run: cargo llvm-cov report --lcov --output-path lcov.info --ignore-filename-regex="ntex-compio|ntex-tokio"

View file

@ -59,6 +59,11 @@ jobs:
run: |
cargo test --all --no-default-features --features="ntex/neon,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
- name: Run tests (neon-uring)
timeout-minutes: 40
run: |
cargo test --all --no-default-features --features="ntex/neon-uring,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
- name: Install cargo-cache
continue-on-error: true
run: |

View file

@ -25,7 +25,7 @@
## Usage
ntex supports multiple async runtimes, runtime must be selected as a feature. Available options are `compio`, `tokio`,
`glommio` or `async-std`.
`neon` or `neon-uring`.
```toml
[dependencies]

View file

@ -60,6 +60,10 @@ impl Flags {
self.contains(Flags::BUF_R_READY)
}
pub(crate) fn is_waiting_for_read(&self) -> bool {
self.contains(Flags::RD_NOTIFY)
}
pub(crate) fn cannot_read(self) -> bool {
self.intersects(Flags::RD_PAUSED | Flags::BUF_R_FULL)
}

View file

@ -437,7 +437,7 @@ impl<F> Io<F> {
} else {
st.dispatch_task.register(cx.waker());
let ready = flags.contains(Flags::BUF_R_READY);
let ready = flags.is_read_buf_ready();
if flags.cannot_read() {
flags.cleanup_read_flags();
st.read_task.wake();
@ -558,24 +558,28 @@ impl<F> Io<F> {
let st = self.st();
let flags = self.flags();
if flags.is_stopped() {
Poll::Ready(Err(st.error_or_disconnected()))
} else {
let len = st.buffer.write_destination_size();
if len > 0 {
if full {
st.insert_flags(Flags::BUF_W_MUST_FLUSH);
st.dispatch_task.register(cx.waker());
return Poll::Pending;
} else if len >= st.pool.get().write_params_high() << 1 {
st.insert_flags(Flags::BUF_W_BACKPRESSURE);
st.dispatch_task.register(cx.waker());
return Poll::Pending;
}
let len = st.buffer.write_destination_size();
if len > 0 {
if full {
st.insert_flags(Flags::BUF_W_MUST_FLUSH);
st.dispatch_task.register(cx.waker());
return if flags.is_stopped() {
Poll::Ready(Err(st.error_or_disconnected()))
} else {
Poll::Pending
};
} else if len >= st.pool.get().write_params_high() << 1 {
st.insert_flags(Flags::BUF_W_BACKPRESSURE);
st.dispatch_task.register(cx.waker());
return if flags.is_stopped() {
Poll::Ready(Err(st.error_or_disconnected()))
} else {
Poll::Pending
};
}
st.remove_flags(Flags::BUF_W_MUST_FLUSH | Flags::BUF_W_BACKPRESSURE);
Poll::Ready(Ok(()))
}
st.remove_flags(Flags::BUF_W_MUST_FLUSH | Flags::BUF_W_BACKPRESSURE);
Poll::Ready(Ok(()))
}
#[inline]

View file

@ -128,7 +128,7 @@ impl ReadContext {
);
// dest buffer has new data, wake up dispatcher
inner.dispatch_task.wake();
} else if inner.flags.get().contains(Flags::RD_NOTIFY) {
} else if inner.flags.get().is_waiting_for_read() {
// in case of "notify" we must wake up dispatch task
// if we read any data from source
inner.dispatch_task.wake();
@ -448,17 +448,276 @@ impl IoContext {
}
/// Get read buffer
pub fn get_read_buf(&self) -> BytesVec {
pub fn get_read_buf(&self) -> Poll<BytesVec> {
let inner = &self.0 .0;
if inner.flags.get().is_read_buf_ready() {
// read buffer is still not read by dispatcher
// we cannot touch it
inner.pool.get().get_read_buf()
if let Some(waker) = inner.read_task.take() {
let mut cx = Context::from_waker(&waker);
if let Poll::Ready(ReadStatus::Ready) = self.0.filter().poll_read_ready(&mut cx)
{
let mut buf = if inner.flags.get().is_read_buf_ready() {
// read buffer is still not read by dispatcher
// we cannot touch it
inner.pool.get().get_read_buf()
} else {
inner
.buffer
.get_read_source()
.unwrap_or_else(|| inner.pool.get().get_read_buf())
};
// make sure we've got room
let (hw, lw) = self.0.memory_pool().read_params().unpack();
let remaining = buf.remaining_mut();
if remaining < lw {
buf.reserve(hw - remaining);
}
return Poll::Ready(buf);
}
}
Poll::Pending
}
pub fn release_read_buf(&self, buf: BytesVec) {
let inner = &self.0 .0;
if let Some(mut first_buf) = inner.buffer.get_read_source() {
first_buf.extend_from_slice(&buf);
inner.buffer.set_read_source(&self.0, first_buf);
} else {
inner
.buffer
.get_read_source()
.unwrap_or_else(|| inner.pool.get().get_read_buf())
inner.buffer.set_read_source(&self.0, buf);
}
}
/// Set read buffer
pub fn set_read_buf(&self, result: io::Result<usize>, buf: BytesVec) -> Poll<()> {
let inner = &self.0 .0;
let (hw, _) = self.0.memory_pool().read_params().unpack();
if let Some(mut first_buf) = inner.buffer.get_read_source() {
first_buf.extend_from_slice(&buf);
inner.buffer.set_read_source(&self.0, first_buf);
} else {
inner.buffer.set_read_source(&self.0, buf);
}
match result {
Ok(0) => {
inner.io_stopped(None);
Poll::Ready(())
}
Ok(nbytes) => {
let filter = self.0.filter();
let res = filter
.process_read_buf(&self.0, &inner.buffer, 0, nbytes)
.and_then(|status| {
if status.nbytes > 0 {
// dest buffer has new data, wake up dispatcher
if inner.buffer.read_destination_size() >= hw {
log::trace!(
"{}: Io read buffer is too large {}, enable read back-pressure",
self.0.tag(),
nbytes
);
inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL);
} else {
inner.insert_flags(Flags::BUF_R_READY);
if nbytes >= hw {
// read task is paused because of read back-pressure
// but there is no new data in top most read buffer
// so we need to wake up read task to read more data
// otherwise read task would sleep forever
inner.read_task.wake();
}
}
log::trace!(
"{}: New {} bytes available, wakeup dispatcher",
self.0.tag(),
nbytes
);
inner.dispatch_task.wake();
} else {
if nbytes >= hw {
// read task is paused because of read back-pressure
// but there is no new data in top most read buffer
// so we need to wake up read task to read more data
// otherwise read task would sleep forever
inner.read_task.wake();
}
if inner.flags.get().is_waiting_for_read() {
// in case of "notify" we must wake up dispatch task
// if we read any data from source
inner.dispatch_task.wake();
}
}
// while reading, filter wrote some data
// in that case filters need to process write buffers
// and potentialy wake write task
if status.need_write {
inner.write_task.wake();
filter.process_write_buf(&self.0, &inner.buffer, 0)
} else {
Ok(())
}
});
if let Err(err) = res {
inner.io_stopped(Some(err));
Poll::Ready(())
} else {
self.shutdown_filters();
Poll::Pending
}
}
Err(e) => {
inner.io_stopped(Some(e));
Poll::Ready(())
}
}
}
/// Get write buffer
pub fn get_write_buf(&self) -> Poll<BytesVec> {
let inner = &self.0 .0;
// check write readiness
if let Some(waker) = inner.write_task.take() {
let ready = self
.0
.filter()
.poll_write_ready(&mut Context::from_waker(&waker));
let buf = if matches!(
ready,
Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown)
) {
inner.buffer.get_write_destination().and_then(|buf| {
if buf.is_empty() {
None
} else {
Some(buf)
}
})
} else {
None
};
if let Some(buf) = buf {
return Poll::Ready(buf);
}
}
Poll::Pending
}
pub fn release_write_buf(&self, mut buf: BytesVec) {
let inner = &self.0 .0;
if let Some(b) = inner.buffer.get_write_destination() {
buf.extend_from_slice(&b);
self.0.memory_pool().release_write_buf(b);
}
inner.buffer.set_write_destination(buf);
// if write buffer is smaller than high watermark value, turn off back-pressure
let len = inner.buffer.write_destination_size();
let mut flags = inner.flags.get();
if len == 0 {
if flags.is_waiting_for_write() {
flags.waiting_for_write_is_done();
inner.dispatch_task.wake();
}
flags.insert(Flags::WR_PAUSED);
inner.flags.set(flags);
} else if flags.contains(Flags::BUF_W_BACKPRESSURE)
&& len < inner.pool.get().write_params_high() << 1
{
flags.remove(Flags::BUF_W_BACKPRESSURE);
inner.flags.set(flags);
inner.dispatch_task.wake();
}
inner.flags.set(flags);
}
/// Set write buffer
pub fn set_write_buf(&self, result: io::Result<usize>, mut buf: BytesVec) -> Poll<()> {
let result = match result {
Ok(0) => {
log::trace!("{}: Disconnected during flush", self.tag());
Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
))
}
Ok(n) => {
if n == buf.len() {
buf.clear();
Ok(0)
} else {
buf.advance(n);
Ok(buf.len())
}
}
Err(e) => Err(e),
};
let inner = &self.0 .0;
// set buffer back
let result = match result {
Ok(0) => {
// log::debug!("{}: WROTE ALL {:?}", self.0.tag(), inner.buffer.write_destination_size());
self.0.memory_pool().release_write_buf(buf);
Ok(inner.buffer.write_destination_size())
}
Ok(_) => {
if let Some(b) = inner.buffer.get_write_destination() {
buf.extend_from_slice(&b);
self.0.memory_pool().release_write_buf(b);
}
let l = buf.len();
// log::debug!("{}: WROTE SOME {:?}", self.0.tag(), l);
inner.buffer.set_write_destination(buf);
Ok(l)
}
Err(e) => Err(e),
};
let mut flags = inner.flags.get();
match result {
Ok(0) => {
// all data has been written
flags.insert(Flags::WR_PAUSED);
if flags.is_task_waiting_for_write() {
flags.task_waiting_for_write_is_done();
inner.write_task.wake();
}
if flags.is_waiting_for_write() {
flags.waiting_for_write_is_done();
inner.dispatch_task.wake();
}
inner.flags.set(flags);
Poll::Ready(())
}
Ok(len) => {
// if write buffer is smaller than high watermark value, turn off back-pressure
if flags.contains(Flags::BUF_W_BACKPRESSURE)
&& len < inner.pool.get().write_params_high() << 1
{
flags.remove(Flags::BUF_W_BACKPRESSURE);
inner.flags.set(flags);
inner.dispatch_task.wake();
}
Poll::Pending
}
Err(e) => {
inner.io_stopped(Some(e));
Poll::Ready(())
}
}
}
@ -524,7 +783,7 @@ impl IoContext {
// otherwise read task would sleep forever
inner.read_task.wake();
}
if inner.flags.get().contains(Flags::RD_NOTIFY) {
if inner.flags.get().is_waiting_for_read() {
// in case of "notify" we must wake up dispatch task
// if we read any data from source
inner.dispatch_task.wake();

View file

@ -37,24 +37,6 @@ pub use asyncify::*;
mod driver_type;
pub use driver_type::*;
thread_local! {
static LOGGING: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
}
/// enable logging for thread
pub fn enable_logging() {
LOGGING.with(|v| v.set(true));
}
/// enable logging for thread
pub fn log<T: AsRef<str>>(s: T) {
LOGGING.with(|_v| {
//if _v.get() {
println!("{}", s.as_ref());
//}
});
}
cfg_if::cfg_if! {
//if #[cfg(windows)] {
// #[path = "iocp/mod.rs"]

View file

@ -4,12 +4,10 @@
//! The operation itself doesn't perform anything.
//! You need to pass them to [`crate::Proactor`], and poll the driver.
use std::{marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
use std::{marker::PhantomPinned, net::Shutdown};
#[cfg(unix)]
pub use super::sys::op::{CreateSocket, Handler, Interest};
use super::OwnedFd;
pub use super::sys::op::*;
/// Spawn a blocking function in the thread pool.
pub struct Asyncify<F, D> {
@ -45,17 +43,3 @@ impl<S> ShutdownSocket<S> {
Self { fd, how }
}
}
/// Close socket fd.
pub struct CloseSocket {
pub(crate) fd: ManuallyDrop<OwnedFd>,
}
impl CloseSocket {
/// Create [`CloseSocket`].
pub fn new(fd: OwnedFd) -> Self {
Self {
fd: ManuallyDrop::new(fd),
}
}
}

View file

@ -1,8 +1,8 @@
use std::{io, marker::Send, os::fd::FromRawFd, os::fd::RawFd, pin::Pin, task::Poll};
use std::{io, marker::Send, mem, os::fd::FromRawFd, os::fd::RawFd, pin::Pin, task::Poll};
pub use crate::driver::unix::op::*;
use super::{AsRawFd, Decision, OpCode};
use super::{AsRawFd, Decision, OpCode, OwnedFd};
use crate::{driver::op::*, syscall};
pub trait Handler {
@ -41,6 +41,20 @@ where
}
}
/// Close socket fd.
pub struct CloseSocket {
pub(crate) fd: mem::ManuallyDrop<OwnedFd>,
}
impl CloseSocket {
/// Create [`CloseSocket`].
pub fn new(fd: OwnedFd) -> Self {
Self {
fd: mem::ManuallyDrop::new(fd),
}
}
}
impl OpCode for CreateSocket {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::Blocking)

View file

@ -22,6 +22,9 @@ use crate::driver::{sys, AsyncifyPool, Entry, Key, ProactorBuilder};
/// Abstraction of io-uring operations.
pub trait OpCode {
/// Name of the operation
fn name(&self) -> &'static str;
/// Call the operation in a blocking way. This method will only be called if
/// [`create_entry`] returns [`OpEntry::Blocking`].
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
@ -41,7 +44,7 @@ pub trait OpCode {
#[derive(Debug)]
enum Change {
Submit { entry: SEntry },
Cancel { user_data: u64, op_id: u64 },
Cancel { op_id: u64 },
}
pub struct DriverApi {
@ -53,7 +56,7 @@ impl DriverApi {
pub fn submit(&self, user_data: u32, entry: SEntry) {
log::debug!(
"Submit operation batch: {:?} user-data: {:?} entry: {:?}",
self.batch,
self.batch >> Driver::BATCH,
user_data,
entry,
);
@ -62,15 +65,14 @@ impl DriverApi {
});
}
pub fn cancel(&self, user_data: u32, op_id: u32) {
pub fn cancel(&self, op_id: u32) {
log::debug!(
"Cancel operation batch: {:?} user-data: {:?}",
self.batch,
user_data
self.batch >> Driver::BATCH,
op_id
);
self.changes.borrow_mut().push_back(Change::Cancel {
op_id: op_id as u64 | self.batch,
user_data: user_data as u64 | self.batch,
});
}
}
@ -89,8 +91,10 @@ pub(crate) struct Driver {
impl Driver {
const NOTIFY: u64 = u64::MAX;
const BATCH_MASK: u64 = 0xFFFF << 48;
const DATA_MASK: u64 = 0xFFFF >> 16;
const CANCEL: u64 = u64::MAX - 1;
const BATCH: u64 = 48;
const BATCH_MASK: u64 = 0xFFFF_0000_0000_0000;
const DATA_MASK: u64 = 0x0000_FFFF_FFFF_FFFF;
pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
log::trace!("New io-uring driver");
@ -120,7 +124,7 @@ impl Driver {
pool: builder.create_or_get_thread_pool(),
pool_completed: Arc::new(SegQueue::new()),
hid: Cell::new(1 << 48),
hid: Cell::new(0),
changes: Rc::new(RefCell::new(VecDeque::new())),
handlers: Cell::new(Some(Box::new(Vec::new()))),
})
@ -134,7 +138,7 @@ impl Driver {
let mut handlers = self.handlers.take().unwrap_or_default();
let api = DriverApi {
batch: id,
batch: id << 48,
changes: self.changes.clone(),
};
handlers.push(f(api));
@ -155,9 +159,9 @@ impl Driver {
ring.submit_and_wait(1)
}
};
log::trace!("Submit result: {res:?}");
match res {
Ok(_) => {
// log::debug!("Submit result: {res:?} {:?}", timeout);
if ring.completion().is_empty() {
Err(io::ErrorKind::TimedOut.into())
} else {
@ -165,7 +169,13 @@ impl Driver {
}
}
Err(e) => match e.raw_os_error() {
Some(libc::ETIME) => Err(io::ErrorKind::TimedOut.into()),
Some(libc::ETIME) => {
if timeout.is_some() && timeout != Some(Duration::ZERO) {
Err(io::ErrorKind::TimedOut.into())
} else {
Ok(())
}
}
Some(libc::EBUSY) | Some(libc::EAGAIN) => {
Err(io::ErrorKind::Interrupted.into())
}
@ -196,10 +206,10 @@ impl Driver {
break;
}
}
Change::Cancel { user_data, op_id } => {
let entry = AsyncCancel::new(op_id).build().user_data(user_data);
if unsafe { squeue.push(&entry.user_data(user_data)) }.is_err() {
changes.push_front(Change::Cancel { user_data, op_id });
Change::Cancel { op_id } => {
let entry = AsyncCancel::new(op_id).build().user_data(Self::CANCEL);
if unsafe { squeue.push(&entry) }.is_err() {
changes.push_front(Change::Cancel { op_id });
break;
}
}
@ -215,12 +225,12 @@ impl Driver {
timeout: Option<Duration>,
f: F,
) -> io::Result<()> {
log::debug!("Start polling");
self.poll_blocking();
let has_more = self.apply_changes();
if !self.poll_completions() || has_more {
let has_more = self.apply_changes();
let poll_result = self.poll_completions();
if !poll_result || has_more {
if has_more {
self.submit_auto(Some(Duration::ZERO))?;
} else {
@ -235,7 +245,7 @@ impl Driver {
}
pub fn push(&self, op: &mut Key<dyn sys::OpCode>) -> Poll<io::Result<usize>> {
log::trace!("push RawOp");
log::trace!("Push op: {:?}", op.as_op_pin().name());
let user_data = op.user_data();
loop {
@ -259,27 +269,28 @@ impl Driver {
for entry in cqueue {
let user_data = entry.user_data();
match user_data {
Self::CANCEL => {}
Self::NOTIFY => {
let flags = entry.flags();
debug_assert!(more(flags));
self.notifier.clear().expect("cannot clear notifier");
}
_ => {
let batch = (user_data & Self::BATCH_MASK) as usize;
let batch = ((user_data & Self::BATCH_MASK) >> Self::BATCH) as usize;
let user_data = (user_data & Self::DATA_MASK) as usize;
let result = entry.result();
let result = if result < 0 {
let result = if result == -libc::ECANCELED {
libc::ETIMEDOUT
} else {
-result
};
Err(io::Error::from_raw_os_error(result))
if result == -libc::ECANCELED {
handlers[batch].canceled(user_data);
} else {
Ok(result as _)
};
handlers[batch].completed(user_data, entry.flags(), result);
let result = if result < 0 {
Err(io::Error::from_raw_os_error(result))
} else {
Ok(result as _)
};
handlers[batch].completed(user_data, entry.flags(), result);
}
}
}
}

View file

@ -1,4 +1,4 @@
use std::{io, os::fd::AsRawFd, os::fd::FromRawFd, os::fd::RawFd, pin::Pin};
use std::{io, os::fd::AsRawFd, pin::Pin};
pub use crate::driver::unix::op::*;
@ -8,6 +8,8 @@ use crate::{driver::op::*, syscall};
pub trait Handler {
/// Operation is completed
fn completed(&mut self, user_data: usize, flags: u32, result: io::Result<i32>);
fn canceled(&mut self, user_data: usize);
}
impl<D, F> OpCode for Asyncify<F, D>
@ -15,6 +17,10 @@ where
D: Send + 'static,
F: (FnOnce() -> (io::Result<usize>, D)) + Send + 'static,
{
fn name(&self) -> &'static str {
"Asyncify"
}
fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
// Safety: self won't be moved
let this = unsafe { self.get_unchecked_mut() };
@ -29,25 +35,21 @@ where
}
impl OpCode for CreateSocket {
fn name(&self) -> &'static str {
"CreateSocket"
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
Ok(syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _)
}
}
impl<S: AsRawFd> OpCode for ShutdownSocket<S> {
fn name(&self) -> &'static str {
"ShutdownSocket"
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
Ok(syscall!(libc::shutdown(self.fd.as_raw_fd(), self.how()))? as _)
}
}
impl CloseSocket {
pub fn from_raw_fd(fd: RawFd) -> Self {
Self::new(unsafe { FromRawFd::from_raw_fd(fd) })
}
}
impl OpCode for CloseSocket {
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
Ok(syscall!(libc::close(self.fd.as_raw_fd()))? as _)
}
}

View file

@ -1,10 +1,9 @@
#![allow(clippy::missing_safety_doc)]
use std::{future::Future, io, mem, mem::MaybeUninit};
use std::{io, mem::MaybeUninit};
use socket2::{Domain, Protocol, SockAddr, Socket as Socket2, Type};
use crate::driver::{op::CloseSocket, op::ShutdownSocket, AsRawFd};
use crate::{impl_raw_fd, syscall};
use crate::{driver::AsRawFd, impl_raw_fd, syscall};
#[derive(Debug)]
pub struct Socket {
@ -120,22 +119,6 @@ impl Socket {
Ok(socket)
}
pub fn close(self) -> impl Future<Output = io::Result<()>> {
let op = CloseSocket::from_raw_fd(self.as_raw_fd());
let fut = crate::submit(op);
mem::forget(self);
async move {
fut.await.0?;
Ok(())
}
}
pub async fn shutdown(&self) -> io::Result<()> {
let op = ShutdownSocket::new(self.as_raw_fd(), std::net::Shutdown::Write);
crate::submit(op).await.0?;
Ok(())
}
#[cfg(unix)]
pub unsafe fn get_socket_option<T: Copy>(
&self,

View file

@ -1,4 +1,4 @@
use std::{future::Future, io, net::SocketAddr};
use std::{io, net::SocketAddr};
use socket2::Socket as Socket2;
@ -26,11 +26,6 @@ impl TcpStream {
Self { inner }
}
/// Close the socket.
pub fn close(self) -> impl Future<Output = io::Result<()>> {
self.inner.close()
}
/// Returns the socket address of the remote peer of this TCP connection.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner

View file

@ -1,4 +1,4 @@
use std::{future::Future, io};
use std::io;
use socket2::{SockAddr, Socket as Socket2};
@ -27,12 +27,6 @@ impl UnixStream {
Self { inner }
}
/// Close the socket. If the returned future is dropped before polling, the
/// socket won't be closed.
pub fn close(self) -> impl Future<Output = io::Result<()>> {
self.inner.close()
}
/// Returns the socket path of the remote peer of this connection.
pub fn peer_addr(&self) -> io::Result<SockAddr> {
#[allow(unused_mut)]

View file

@ -21,7 +21,9 @@ pub use crate::rt_polling::{
feature = "neon-uring",
not(feature = "neon"),
not(feature = "tokio"),
not(feature = "compio")
not(feature = "compio"),
target_os = "linux",
feature = "io-uring"
))]
pub use crate::rt_uring::{
from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect,

View file

@ -22,6 +22,8 @@ mod rt_polling;
feature = "neon-uring",
not(feature = "neon"),
not(feature = "tokio"),
not(feature = "compio")
not(feature = "compio"),
target_os = "linux",
feature = "io-uring"
))]
mod rt_uring;

View file

@ -1,6 +1,6 @@
use std::{cell::Cell, collections::VecDeque, fmt, io, ptr, rc::Rc, task, task::Poll};
use ntex_neon::driver::op::{Handler, Interest};
use ntex_neon::driver::op::{CloseSocket, Handler, Interest};
use ntex_neon::driver::{AsRawFd, DriverApi, RawFd};
use ntex_neon::{syscall, Runtime};
use slab::Slab;
@ -149,7 +149,8 @@ impl<T> Handler for StreamOpsHandler<T> {
.inspect(|size| {
unsafe { buf.advance_mut(*size) };
log::debug!(
"FD: {:?}, SIZE: {:?}, BUF: {:?}",
"{}: {:?}, SIZE: {:?}, BUF: {:?}",
item.context.tag(),
item.fd,
size,
buf
@ -193,10 +194,11 @@ impl<T> Handler for StreamOpsHandler<T> {
// extra
let mut feed = self.inner.feed.take().unwrap();
for id in feed.drain(..) {
log::debug!("Drop io ({}), {:?}", id, streams[id].fd);
let item = &mut streams[id];
log::debug!("{}: Drop io ({}), {:?}", item.context.tag(), id, item.fd);
streams[id].ref_count -= 1;
if streams[id].ref_count == 0 {
item.ref_count -= 1;
if item.ref_count == 0 {
let item = streams.remove(id);
if item.io.is_some() {
self.inner.api.unregister_all(item.fd);
@ -209,20 +211,17 @@ impl<T> Handler for StreamOpsHandler<T> {
}
}
pub(crate) trait Closable {
async fn close(self) -> io::Result<()>;
}
impl<T> StreamCtl<T> {
pub(crate) async fn close(self) -> io::Result<()>
where
T: Closable,
{
if let Some(io) = self.with(|streams| streams[self.id].io.take()) {
io.close().await
} else {
Ok(())
pub(crate) async fn close(self) -> io::Result<()> {
let (io, fd) =
self.with(|streams| (streams[self.id].io.take(), streams[self.id].fd));
if let Some(io) = io {
let op = CloseSocket::from_raw_fd(fd);
let fut = ntex_neon::submit(op);
std::mem::forget(io);
fut.await.0?;
}
Ok(())
}
pub(crate) fn with_io<F, R>(&self, f: F) -> R
@ -237,7 +236,12 @@ impl<T> StreamCtl<T> {
let item = &mut streams[self.id];
if item.flags.intersects(Flags::RD | Flags::WR) {
log::debug!("Pause all io ({}), {:?}", self.id, item.fd);
log::debug!(
"{}: Pause all io ({}), {:?}",
item.context.tag(),
self.id,
item.fd
);
item.flags.remove(Flags::RD | Flags::WR);
self.inner.api.unregister_all(item.fd);
}
@ -248,7 +252,12 @@ impl<T> StreamCtl<T> {
self.with(|streams| {
let item = &mut streams[self.id];
log::debug!("Pause io read ({}), {:?}", self.id, item.fd);
log::debug!(
"{}: Pause io read ({}), {:?}",
item.context.tag(),
self.id,
item.fd
);
if item.flags.contains(Flags::RD) {
item.flags.remove(Flags::RD);
self.inner.api.unregister(item.fd, Interest::Readable);
@ -260,7 +269,12 @@ impl<T> StreamCtl<T> {
self.with(|streams| {
let item = &mut streams[self.id];
log::debug!("Resume io read ({}), {:?}", self.id, item.fd);
log::debug!(
"{}: Resume io read ({}), {:?}",
item.context.tag(),
self.id,
item.fd
);
if !item.flags.contains(Flags::RD) {
item.flags.insert(Flags::RD);
self.inner
@ -275,9 +289,19 @@ impl<T> StreamCtl<T> {
let item = &mut streams[self.id];
if !item.flags.contains(Flags::WR) {
log::debug!("Resume io write ({}), {:?}", self.id, item.fd);
log::debug!(
"{}: Resume io write ({}), {:?}",
item.context.tag(),
self.id,
item.fd
);
let result = item.context.with_write_buf(|buf| {
log::debug!("Writing io ({}), buf: {:?}", self.id, buf.len());
log::debug!(
"{}: Writing io ({}), buf: {:?}",
item.context.tag(),
self.id,
buf.len()
);
let slice = &buf[..];
syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len()))
@ -285,7 +309,8 @@ impl<T> StreamCtl<T> {
if result.is_pending() {
log::debug!(
"Write is pending ({}), {:?}",
"{}: Write is pending ({}), {:?}",
item.context.tag(),
self.id,
item.context.flags()
);
@ -325,7 +350,12 @@ impl<T> Clone for StreamCtl<T> {
impl<T> Drop for StreamCtl<T> {
fn drop(&mut self) {
if let Some(mut streams) = self.inner.streams.take() {
log::debug!("Drop io ({}), {:?}", self.id, streams[self.id].fd);
log::debug!(
"{}: Drop io ({}), {:?}",
streams[self.id].context.tag(),
self.id,
streams[self.id].fd
);
streams[self.id].ref_count -= 1;
if streams[self.id].ref_count == 0 {

View file

@ -1,11 +1,11 @@
use std::{any, future::poll_fn, io, task::Poll};
use std::{any, future::poll_fn, task::Poll};
use ntex_io::{
types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
};
use ntex_neon::{net::TcpStream, net::UnixStream, spawn};
use ntex_neon::{net::TcpStream, spawn};
use super::driver::{Closable, StreamCtl, StreamOps};
use super::driver::{StreamCtl, StreamOps};
impl IoStream for super::TcpStream {
fn start(self, read: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>> {
@ -44,25 +44,13 @@ impl Handle for HandleWrapper {
}
}
impl Closable for TcpStream {
async fn close(self) -> io::Result<()> {
TcpStream::close(self).await
}
}
impl Closable for UnixStream {
async fn close(self) -> io::Result<()> {
UnixStream::close(self).await
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum Status {
Shutdown,
Terminate,
}
async fn run<T: Closable>(ctl: StreamCtl<T>, context: IoContext) {
async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
// Handle io read readiness
let st = poll_fn(|cx| {
let read = match context.poll_read_ready(cx) {

View file

@ -1,10 +1,11 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::{cell::RefCell, collections::VecDeque, io, path::Path, rc::Rc, task::Poll};
use std::{cell::RefCell, io, path::Path, rc::Rc};
use ntex_neon::driver::op::{Handler, Interest};
use io_uring::{opcode, types::Fd};
use ntex_neon::driver::op::Handler;
use ntex_neon::driver::{AsRawFd, DriverApi, RawFd};
use ntex_neon::net::{Socket, TcpStream, UnixStream};
use ntex_neon::{syscall, Runtime};
use ntex_neon::Runtime;
use ntex_util::channel::oneshot::{channel, Sender};
use slab::Slab;
use socket2::{Protocol, SockAddr, Type};
@ -29,7 +30,7 @@ pub(crate) async fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
let (sender, rx) = channel();
ConnectOps::current().connect(socket.as_raw_fd(), addr, sender)?;
ConnectOps::current().connect(socket.as_raw_fd(), addr, sender);
rx.await
.map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone"))
@ -54,7 +55,7 @@ pub(crate) async fn connect_unix(path: impl AsRef<Path>) -> io::Result<UnixStrea
let (sender, rx) = channel();
ConnectOps::current().connect(socket.as_raw_fd(), addr, sender)?;
ConnectOps::current().connect(socket.as_raw_fd(), addr, sender);
rx.await
.map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone"))
@ -73,19 +74,13 @@ enum Change {
Error(io::Error),
}
struct ConnectOpsBatcher {
feed: VecDeque<(usize, Change)>,
struct ConnectOpsHandler {
inner: Rc<ConnectOpsInner>,
}
struct Item {
fd: RawFd,
sender: Sender<io::Result<()>>,
}
struct ConnectOpsInner {
api: DriverApi,
connects: RefCell<Slab<Item>>,
ops: RefCell<Slab<Sender<io::Result<()>>>>,
}
impl ConnectOps {
@ -98,13 +93,10 @@ impl ConnectOps {
rt.driver().register_handler(|api| {
let ops = Rc::new(ConnectOpsInner {
api,
connects: RefCell::new(Slab::new()),
ops: RefCell::new(Slab::new()),
});
inner = Some(ops.clone());
Box::new(ConnectOpsBatcher {
inner: ops,
feed: VecDeque::new(),
})
Box::new(ConnectOpsHandler { inner: ops })
});
let s = ConnectOps(inner.unwrap());
@ -119,78 +111,28 @@ impl ConnectOps {
fd: RawFd,
addr: SockAddr,
sender: Sender<io::Result<()>>,
) -> io::Result<usize> {
let result = syscall!(break libc::connect(fd, addr.as_ptr(), addr.len()));
) -> usize {
let id = self.0.ops.borrow_mut().insert(sender);
self.0.api.submit(
id as u32,
opcode::Connect::new(Fd(fd), addr.as_ptr(), addr.len()).build(),
);
if let Poll::Ready(res) = result {
res?;
}
let item = Item { fd, sender };
let id = self.0.connects.borrow_mut().insert(item);
self.0.api.register(fd, id, Interest::Writable);
Ok(id)
id
}
}
impl Handler for ConnectOpsBatcher {
fn readable(&mut self, id: usize) {
log::debug!("ConnectFD is readable {:?}", id);
self.feed.push_back((id, Change::Readable));
impl Handler for ConnectOpsHandler {
fn canceled(&mut self, user_data: usize) {
log::debug!("Op is canceled {:?}", user_data);
self.inner.ops.borrow_mut().remove(user_data);
}
fn writable(&mut self, id: usize) {
log::debug!("ConnectFD is writable {:?}", id);
self.feed.push_back((id, Change::Writable));
}
fn completed(&mut self, user_data: usize, flags: u32, result: io::Result<i32>) {
log::debug!("Op is completed {:?} result: {:?}", user_data, result);
fn error(&mut self, id: usize, err: io::Error) {
self.feed.push_back((id, Change::Error(err)));
}
fn commit(&mut self) {
if self.feed.is_empty() {
return;
}
log::debug!("Commit connect driver changes, num: {:?}", self.feed.len());
let mut connects = self.inner.connects.borrow_mut();
for (id, change) in self.feed.drain(..) {
if connects.contains(id) {
let item = connects.remove(id);
match change {
Change::Readable => unreachable!(),
Change::Writable => {
let mut err: libc::c_int = 0;
let mut err_len =
std::mem::size_of::<libc::c_int>() as libc::socklen_t;
let res = syscall!(libc::getsockopt(
item.fd.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_ERROR,
&mut err as *mut _ as *mut _,
&mut err_len
));
let res = if err == 0 {
res.map(|_| ())
} else {
Err(io::Error::from_raw_os_error(err))
};
self.inner.api.unregister_all(item.fd);
let _ = item.sender.send(res);
}
Change::Error(err) => {
let _ = item.sender.send(Err(err));
self.inner.api.unregister_all(item.fd);
}
}
}
}
let tx = self.inner.ops.borrow_mut().remove(user_data);
let _ = tx.send(result.map(|_| ()));
}
}

View file

@ -1,13 +1,13 @@
use std::{cell::RefCell, collections::VecDeque, fmt, io, num::NonZeroU32, rc::Rc};
use std::{cell::RefCell, fmt, io, mem, num::NonZeroU32, rc::Rc, task::Poll};
use io_uring::{opcode, types::Fd};
use io_uring::{opcode, squeue::Entry, types::Fd};
use ntex_neon::driver::op::Handler;
use ntex_neon::driver::{AsRawFd, DriverApi};
use ntex_neon::Runtime;
use ntex_util::channel::oneshot;
use slab::Slab;
use ntex_bytes::{BufMut, BytesVec};
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::IoContext;
pub(crate) struct StreamCtl<T> {
@ -36,10 +36,7 @@ enum Operation {
context: IoContext,
},
Close {
tx: Option<oneshot::Sender<io::Result<()>>>,
},
Cancel {
id: u32,
tx: Option<oneshot::Sender<io::Result<i32>>>,
},
Nop,
}
@ -52,8 +49,13 @@ struct StreamOpsHandler<T> {
struct StreamOpsInner<T> {
api: DriverApi,
feed: RefCell<VecDeque<usize>>,
storage: RefCell<(Slab<StreamItem<T>>, Slab<Operation>)>,
feed: RefCell<Vec<usize>>,
storage: RefCell<StreamOpsStorage<T>>,
}
struct StreamOpsStorage<T> {
streams: Slab<StreamItem<T>>,
ops: Slab<Operation>,
}
impl<T: AsRawFd + 'static> StreamOps<T> {
@ -64,13 +66,16 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
} else {
let mut inner = None;
rt.driver().register_handler(|api| {
let mut storage = Slab::new();
storage.insert(Operation::Nop);
let mut ops = Slab::new();
ops.insert(Operation::Nop);
let ops = Rc::new(StreamOpsInner {
api,
feed: RefCell::new(VecDeque::new()),
storage: RefCell::new((Slab::new(), Slab::new())),
feed: RefCell::new(Vec::new()),
storage: RefCell::new(StreamOpsStorage {
ops,
streams: Slab::new(),
}),
});
inner = Some(ops.clone());
Box::new(StreamOpsHandler { inner: ops })
@ -92,18 +97,16 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
rd_op: None,
wr_op: None,
};
self.with(|streams| {
let id = streams.0.insert(item);
StreamCtl {
id,
inner: self.0.clone(),
}
})
let id = self.0.storage.borrow_mut().streams.insert(item);
StreamCtl {
id,
inner: self.0.clone(),
}
}
fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut (Slab<StreamItem<T>>, Slab<Operation>)) -> R,
F: FnOnce(&mut StreamOpsStorage<T>) -> R,
{
f(&mut *self.0.storage.borrow_mut())
}
@ -116,48 +119,208 @@ impl<T> Clone for StreamOps<T> {
}
impl<T> Handler for StreamOpsHandler<T> {
fn canceled(&mut self, user_data: usize) {
let mut storage = self.inner.storage.borrow_mut();
match storage.ops.remove(user_data) {
Operation::Recv { id, buf, context } => {
log::debug!("{}: Recv canceled {:?}", context.tag(), id,);
context.release_read_buf(buf);
}
Operation::Send { id, buf, context } => {
log::debug!("{}: Send canceled: {:?}", context.tag(), id);
context.release_write_buf(buf);
}
Operation::Nop | Operation::Close { .. } => {}
}
}
fn completed(&mut self, user_data: usize, flags: u32, result: io::Result<i32>) {
log::debug!("Op is completed {:?} result: {:?}", user_data, result);
let mut storage = self.inner.storage.borrow_mut();
// let mut storage = self.inner.storage.borrow_mut();
// for (id, flags, result) in self.feed.drain(..) {}
let op = storage.ops.remove(user_data);
match op {
Operation::Recv {
id,
mut buf,
context,
} => {
let result = result.map(|size| {
unsafe { buf.advance_mut(size as usize) };
size as usize
});
// // extra
// for id in self.inner.feed.borrow_mut().drain(..) {
// log::debug!("Drop io ({}), {:?}", id, storage.0[id].fd);
// reset op reference
if let Some(item) = storage.streams.get_mut(id) {
log::debug!(
"{}: Recv completed {:?}, res: {:?}, buf({}): {:?}",
context.tag(),
item.fd,
result,
buf.remaining_mut(),
buf,
);
item.rd_op.take();
}
// storage.0[id].ref_count -= 1;
// if storage.0[id].ref_count == 0 {
// let item = storage.0.remove(id);
// if item.io.is_some() {
// // self.inner.api.unregister_all(item.fd);
// }
// }
// }
// set read buf
let tag = context.tag();
if context.set_read_buf(result, buf).is_pending() {
if let Some((id, op)) = storage.recv(id, Some(context)) {
self.inner.api.submit(id, op);
}
} else {
log::debug!("{}: Recv to pause", tag);
}
}
Operation::Send { id, buf, context } => {
// reset op reference
if let Some(item) = storage.streams.get_mut(id) {
log::debug!(
"{}: Send completed: {:?}, res: {:?}",
context.tag(),
item.fd,
result
);
item.wr_op.take();
}
// set read buf
if context
.set_write_buf(result.map(|size| size as usize), buf)
.is_pending()
{
if let Some((id, op)) = storage.send(id, Some(context)) {
self.inner.api.submit(id, op);
}
}
}
Operation::Close { tx } => {
if let Some(tx) = tx {
let _ = tx.send(result);
}
}
Operation::Nop => {}
}
// extra
for id in self.inner.feed.borrow_mut().drain(..) {
storage.streams[id].ref_count -= 1;
if storage.streams[id].ref_count == 0 {
let mut item = storage.streams.remove(id);
log::debug!("{}: Drop io ({}), {:?}", item.context.tag(), id, item.fd);
if let Some(io) = item.io.take() {
mem::forget(io);
let id = storage.ops.insert(Operation::Close { tx: None });
assert!(id < u32::MAX as usize);
self.inner
.api
.submit(id as u32, opcode::Close::new(item.fd).build());
}
}
}
}
}
impl<T> StreamOpsStorage<T> {
fn recv(&mut self, id: usize, context: Option<IoContext>) -> Option<(u32, Entry)> {
let item = &mut self.streams[id];
if item.rd_op.is_none() {
if let Poll::Ready(mut buf) = item.context.get_read_buf() {
log::debug!(
"{}: Recv resume ({}), {:?} - {:?} = {:?}",
item.context.tag(),
id,
item.fd,
buf,
buf.remaining_mut()
);
let slice = buf.chunk_mut();
let op = opcode::Recv::new(item.fd, slice.as_mut_ptr(), slice.len() as u32)
.build();
let op_id = self.ops.insert(Operation::Recv {
id,
buf,
context: context.unwrap_or_else(|| item.context.clone()),
});
assert!(op_id < u32::MAX as usize);
item.rd_op = NonZeroU32::new(op_id as u32);
return Some((op_id as u32, op));
}
}
None
}
fn send(&mut self, id: usize, context: Option<IoContext>) -> Option<(u32, Entry)> {
let item = &mut self.streams[id];
if item.wr_op.is_none() {
if let Poll::Ready(buf) = item.context.get_write_buf() {
log::debug!(
"{}: Send resume ({}), {:?} {:?}",
item.context.tag(),
id,
item.fd,
buf
);
let slice = buf.chunk();
let op =
opcode::Send::new(item.fd, slice.as_ptr(), slice.len() as u32).build();
let op_id = self.ops.insert(Operation::Send {
id,
buf,
context: context.unwrap_or_else(|| item.context.clone()),
});
assert!(op_id < u32::MAX as usize);
item.wr_op = NonZeroU32::new(op_id as u32);
return Some((op_id as u32, op));
}
}
None
}
}
impl<T> StreamCtl<T> {
pub(crate) async fn close(self) -> io::Result<()> {
let result = self.with(|streams| {
let item = &mut streams.0[self.id];
if let Some(io) = item.io.take() {
let result = {
let mut storage = self.inner.storage.borrow_mut();
let (io, fd) = {
let item = &mut storage.streams[self.id];
(item.io.take(), item.fd)
};
if let Some(io) = io {
mem::forget(io);
let (tx, rx) = oneshot::channel();
let id = streams.1.insert(Operation::Close { tx: Some(tx) });
let id = storage.ops.insert(Operation::Close { tx: Some(tx) });
assert!(id < u32::MAX as usize);
drop(storage);
self.inner
.api
.submit(id as u32, opcode::Close::new(item.fd).build());
.submit(id as u32, opcode::Close::new(fd).build());
Some(rx)
} else {
None
}
});
};
if let Some(rx) = result {
rx.await
.map_err(|_| io::Error::new(io::ErrorKind::Other, "gone"))
.and_then(|item| item)
.map(|_| ())
} else {
Ok(())
}
@ -167,87 +330,65 @@ impl<T> StreamCtl<T> {
where
F: FnOnce(Option<&T>) -> R,
{
self.with(|streams| f(streams.0[self.id].io.as_ref()))
f(self.inner.storage.borrow().streams[self.id].io.as_ref())
}
pub(crate) fn resume_read(&self) {
self.with(|streams| {
let item = &mut streams.0[self.id];
if item.rd_op.is_none() {
log::debug!("Resume io read ({}), {:?}", self.id, item.fd);
let mut buf = item.context.get_read_buf();
let slice = buf.chunk_mut();
let op = opcode::Recv::new(item.fd, slice.as_mut_ptr(), slice.len() as u32)
.build();
let id = streams.1.insert(Operation::Recv {
buf,
id: self.id,
context: item.context.clone(),
});
assert!(id < u32::MAX as usize);
self.inner.api.submit(id as u32, op);
}
})
let result = self.inner.storage.borrow_mut().recv(self.id, None);
if let Some((id, op)) = result {
self.inner.api.submit(id, op);
}
}
pub(crate) fn resume_write(&self) {
self.with(|streams| {
let item = &mut streams.0[self.id];
if item.wr_op.is_none() {
log::debug!("Resume io write ({}), {:?}", self.id, item.fd);
//self.inner.api.unregister(item.fd, Interest::Readable);
}
})
let result = self.inner.storage.borrow_mut().send(self.id, None);
if let Some((id, op)) = result {
self.inner.api.submit(id, op);
}
}
pub(crate) fn pause_read(&self) {
self.with(|streams| {
let item = &mut streams.0[self.id];
if let Some(rd_op) = item.rd_op {
log::debug!("Pause io read ({}), {:?}", self.id, item.fd);
let id = streams.1.insert(Operation::Cancel { id: rd_op.get() });
assert!(id < u32::MAX as usize);
self.inner.api.cancel(id as u32, rd_op.get());
}
})
}
fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut (Slab<StreamItem<T>>, Slab<Operation>)) -> R,
{
let mut storage = self.inner.storage.borrow_mut();
f(&mut *storage)
let item = &mut storage.streams[self.id];
if let Some(rd_op) = item.rd_op {
log::debug!(
"{}: Recv to pause ({}), {:?}",
item.context.tag(),
self.id,
item.fd
);
self.inner.api.cancel(rd_op.get());
}
}
}
impl<T> Clone for StreamCtl<T> {
fn clone(&self) -> Self {
self.with(|streams| {
streams.0[self.id].ref_count += 1;
Self {
id: self.id,
inner: self.inner.clone(),
}
})
self.inner.storage.borrow_mut().streams[self.id].ref_count += 1;
Self {
id: self.id,
inner: self.inner.clone(),
}
}
}
impl<T> Drop for StreamCtl<T> {
fn drop(&mut self) {
if let Ok(storage) = &mut self.inner.storage.try_borrow_mut() {
log::debug!("Drop io ({}), {:?}", self.id, storage.0[self.id].fd);
if let Ok(mut storage) = self.inner.storage.try_borrow_mut() {
storage.streams[self.id].ref_count -= 1;
if storage.streams[self.id].ref_count == 0 {
let mut item = storage.streams.remove(self.id);
if let Some(io) = item.io.take() {
log::debug!(
"{}: Close io ({}), {:?}",
item.context.tag(),
self.id,
item.fd
);
mem::forget(io);
storage.0[self.id].ref_count -= 1;
if storage.0[self.id].ref_count == 0 {
let item = storage.0.remove(self.id);
if item.io.is_some() {
let id = storage.1.insert(Operation::Close { tx: None });
let id = storage.ops.insert(Operation::Close { tx: None });
assert!(id < u32::MAX as usize);
self.inner
.api
@ -255,7 +396,7 @@ impl<T> Drop for StreamCtl<T> {
}
}
} else {
self.inner.feed.borrow_mut().push_back(self.id);
self.inner.feed.borrow_mut().push(self.id);
}
}
}
@ -269,11 +410,10 @@ impl<T> PartialEq for StreamCtl<T> {
impl<T: fmt::Debug> fmt::Debug for StreamCtl<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.with(|streams| {
f.debug_struct("StreamCtl")
.field("id", &self.id)
.field("io", &streams.0[self.id].io)
.finish()
})
let storage = self.inner.storage.borrow();
f.debug_struct("StreamCtl")
.field("id", &self.id)
.field("io", &storage.streams[self.id].io)
.finish()
}
}

View file

@ -1,11 +1,11 @@
use std::{any, future::poll_fn, io, task::Poll};
use std::{any, future::poll_fn, task::Poll};
use ntex_io::{
types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
};
use ntex_neon::{net::TcpStream, net::UnixStream, spawn};
use ntex_neon::{net::TcpStream, spawn};
use super::driver::{Closable, StreamOps, StreamCtl};
use super::driver::{StreamCtl, StreamOps};
impl IoStream for super::TcpStream {
fn start(self, read: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>> {
@ -44,26 +44,14 @@ impl Handle for HandleWrapper {
}
}
impl Closable for TcpStream {
async fn close(self) -> io::Result<()> {
TcpStream::close(self).await
}
}
impl Closable for UnixStream {
async fn close(self) -> io::Result<()> {
UnixStream::close(self).await
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum Status {
Shutdown,
Terminate,
}
async fn run<T: Closable>(ctl: StreamCtl<T>, context: IoContext) {
// Handle io read readiness
async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
// Handle io readiness
let st = poll_fn(|cx| {
let read = match context.poll_read_ready(cx) {
Poll::Ready(ReadStatus::Ready) => {
@ -79,6 +67,7 @@ async fn run<T: Closable>(ctl: StreamCtl<T>, context: IoContext) {
let write = match context.poll_write_ready(cx) {
Poll::Ready(WriteStatus::Ready) => {
log::debug!("{}: write ready", context.tag());
ctl.resume_write();
Poll::Pending
}
@ -97,11 +86,10 @@ async fn run<T: Closable>(ctl: StreamCtl<T>, context: IoContext) {
})
.await;
ctl.pause_read();
ctl.resume_write();
context.shutdown(st == Status::Shutdown).await;
ctl.pause_all();
let result = ctl.close().await;
context.stopped(result.err());
}

View file

@ -3,9 +3,9 @@ use std::{io::Result, net, net::SocketAddr};
use ntex_bytes::PoolRef;
use ntex_io::Io;
//mod connect;
mod connect;
mod driver;
//mod io;
mod io;
/// Tcp stream wrapper for neon TcpStream
struct TcpStream(ntex_neon::net::TcpStream);
@ -15,16 +15,14 @@ struct UnixStream(ntex_neon::net::UnixStream);
/// Opens a TCP connection to a remote host.
pub async fn tcp_connect(addr: SocketAddr) -> Result<Io> {
//let sock = connect::connect(addr).await?;
//Ok(Io::new(TcpStream(sock)))
todo!()
let sock = connect::connect(addr).await?;
Ok(Io::new(TcpStream(sock)))
}
/// Opens a TCP connection to a remote host and use specified memory pool.
pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result<Io> {
//let sock = connect::connect(addr).await?;
//Ok(Io::with_memory_pool(TcpStream(sock), pool))
todo!()
let sock = connect::connect(addr).await?;
Ok(Io::with_memory_pool(TcpStream(sock), pool))
}
/// Opens a unix stream connection.
@ -32,9 +30,8 @@ pub async fn unix_connect<'a, P>(addr: P) -> Result<Io>
where
P: AsRef<std::path::Path> + 'a,
{
//let sock = connect::connect_unix(addr).await?;
//Ok(Io::new(UnixStream(sock)))
todo!()
let sock = connect::connect_unix(addr).await?;
Ok(Io::new(UnixStream(sock)))
}
/// Opens a unix stream connection and specified memory pool.
@ -42,24 +39,21 @@ pub async fn unix_connect_in<'a, P>(addr: P, pool: PoolRef) -> Result<Io>
where
P: AsRef<std::path::Path> + 'a,
{
//let sock = connect::connect_unix(addr).await?;
//Ok(Io::with_memory_pool(UnixStream(sock), pool))
todo!()
let sock = connect::connect_unix(addr).await?;
Ok(Io::with_memory_pool(UnixStream(sock), pool))
}
/// Convert std TcpStream to tokio's TcpStream
pub fn from_tcp_stream(stream: net::TcpStream) -> Result<Io> {
//stream.set_nodelay(true)?;
//Ok(Io::new(TcpStream(ntex_neon::net::TcpStream::from_std(
//stream,
//)?)))
todo!()
stream.set_nodelay(true)?;
Ok(Io::new(TcpStream(ntex_neon::net::TcpStream::from_std(
stream,
)?)))
}
/// Convert std UnixStream to tokio's UnixStream
pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
//Ok(Io::new(UnixStream(ntex_neon::net::UnixStream::from_std(
// stream,
//)?)))
todo!()
Ok(Io::new(UnixStream(ntex_neon::net::UnixStream::from_std(
stream,
)?)))
}

View file

@ -51,6 +51,9 @@ compio = ["ntex-net/compio"]
# neon runtime
neon = ["ntex-net/neon"]
# neon runtime
neon-uring = ["ntex-net/neon-uring"]
# websocket support
ws = ["dep:sha-1"]

View file

@ -92,6 +92,7 @@ async fn test_run() {
})
})
.unwrap()
.set_tag("test", "SRV")
.run();
let _ = tx.send((srv, ntex::rt::System::current()));
Ok(())