diff --git a/.github/workflows/cov.yml b/.github/workflows/cov.yml index 7d17f589..409549ca 100644 --- a/.github/workflows/cov.yml +++ b/.github/workflows/cov.yml @@ -35,6 +35,9 @@ jobs: - name: Code coverage (neon) run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/neon,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" + - name: Code coverage (neon-uring) + run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/neon-uring,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" + - name: Generate coverage report run: cargo llvm-cov report --lcov --output-path lcov.info --ignore-filename-regex="ntex-compio|ntex-tokio" diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 97bc3265..e178d634 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -59,6 +59,11 @@ jobs: run: | cargo test --all --no-default-features --features="ntex/neon,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" + - name: Run tests (neon-uring) + timeout-minutes: 40 + run: | + cargo test --all --no-default-features --features="ntex/neon-uring,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" + - name: Install cargo-cache continue-on-error: true run: | diff --git a/README.md b/README.md index f9f9ffb9..443dbf0d 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ ## Usage ntex supports multiple async runtimes, runtime must be selected as a feature. Available options are `compio`, `tokio`, -`glommio` or `async-std`. +`neon` or `neon-uring`. ```toml [dependencies] diff --git a/ntex-io/src/flags.rs b/ntex-io/src/flags.rs index 565f20c0..1e65d2a7 100644 --- a/ntex-io/src/flags.rs +++ b/ntex-io/src/flags.rs @@ -60,6 +60,10 @@ impl Flags { self.contains(Flags::BUF_R_READY) } + pub(crate) fn is_waiting_for_read(&self) -> bool { + self.contains(Flags::RD_NOTIFY) + } + pub(crate) fn cannot_read(self) -> bool { self.intersects(Flags::RD_PAUSED | Flags::BUF_R_FULL) } diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index f274bb50..498e249d 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -437,7 +437,7 @@ impl Io { } else { st.dispatch_task.register(cx.waker()); - let ready = flags.contains(Flags::BUF_R_READY); + let ready = flags.is_read_buf_ready(); if flags.cannot_read() { flags.cleanup_read_flags(); st.read_task.wake(); @@ -558,24 +558,28 @@ impl Io { let st = self.st(); let flags = self.flags(); - if flags.is_stopped() { - Poll::Ready(Err(st.error_or_disconnected())) - } else { - let len = st.buffer.write_destination_size(); - if len > 0 { - if full { - st.insert_flags(Flags::BUF_W_MUST_FLUSH); - st.dispatch_task.register(cx.waker()); - return Poll::Pending; - } else if len >= st.pool.get().write_params_high() << 1 { - st.insert_flags(Flags::BUF_W_BACKPRESSURE); - st.dispatch_task.register(cx.waker()); - return Poll::Pending; - } + let len = st.buffer.write_destination_size(); + if len > 0 { + if full { + st.insert_flags(Flags::BUF_W_MUST_FLUSH); + st.dispatch_task.register(cx.waker()); + return if flags.is_stopped() { + Poll::Ready(Err(st.error_or_disconnected())) + } else { + Poll::Pending + }; + } else if len >= st.pool.get().write_params_high() << 1 { + st.insert_flags(Flags::BUF_W_BACKPRESSURE); + st.dispatch_task.register(cx.waker()); + return if flags.is_stopped() { + Poll::Ready(Err(st.error_or_disconnected())) + } else { + Poll::Pending + }; } - st.remove_flags(Flags::BUF_W_MUST_FLUSH | Flags::BUF_W_BACKPRESSURE); - Poll::Ready(Ok(())) } + st.remove_flags(Flags::BUF_W_MUST_FLUSH | Flags::BUF_W_BACKPRESSURE); + Poll::Ready(Ok(())) } #[inline] diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index ae67263f..75dc2cd8 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -128,7 +128,7 @@ impl ReadContext { ); // dest buffer has new data, wake up dispatcher inner.dispatch_task.wake(); - } else if inner.flags.get().contains(Flags::RD_NOTIFY) { + } else if inner.flags.get().is_waiting_for_read() { // in case of "notify" we must wake up dispatch task // if we read any data from source inner.dispatch_task.wake(); @@ -447,6 +447,280 @@ impl IoContext { } } + /// Get read buffer + pub fn get_read_buf(&self) -> Poll { + let inner = &self.0 .0; + + if let Some(waker) = inner.read_task.take() { + let mut cx = Context::from_waker(&waker); + + if let Poll::Ready(ReadStatus::Ready) = self.0.filter().poll_read_ready(&mut cx) + { + let mut buf = if inner.flags.get().is_read_buf_ready() { + // read buffer is still not read by dispatcher + // we cannot touch it + inner.pool.get().get_read_buf() + } else { + inner + .buffer + .get_read_source() + .unwrap_or_else(|| inner.pool.get().get_read_buf()) + }; + + // make sure we've got room + let (hw, lw) = self.0.memory_pool().read_params().unpack(); + let remaining = buf.remaining_mut(); + if remaining < lw { + buf.reserve(hw - remaining); + } + return Poll::Ready(buf); + } + } + + Poll::Pending + } + + pub fn release_read_buf(&self, buf: BytesVec) { + let inner = &self.0 .0; + if let Some(mut first_buf) = inner.buffer.get_read_source() { + first_buf.extend_from_slice(&buf); + inner.buffer.set_read_source(&self.0, first_buf); + } else { + inner.buffer.set_read_source(&self.0, buf); + } + } + + /// Set read buffer + pub fn set_read_buf(&self, result: io::Result, buf: BytesVec) -> Poll<()> { + let inner = &self.0 .0; + let (hw, _) = self.0.memory_pool().read_params().unpack(); + + if let Some(mut first_buf) = inner.buffer.get_read_source() { + first_buf.extend_from_slice(&buf); + inner.buffer.set_read_source(&self.0, first_buf); + } else { + inner.buffer.set_read_source(&self.0, buf); + } + + match result { + Ok(0) => { + inner.io_stopped(None); + Poll::Ready(()) + } + Ok(nbytes) => { + let filter = self.0.filter(); + let res = filter + .process_read_buf(&self.0, &inner.buffer, 0, nbytes) + .and_then(|status| { + if status.nbytes > 0 { + // dest buffer has new data, wake up dispatcher + if inner.buffer.read_destination_size() >= hw { + log::trace!( + "{}: Io read buffer is too large {}, enable read back-pressure", + self.0.tag(), + nbytes + ); + inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL); + } else { + inner.insert_flags(Flags::BUF_R_READY); + + if nbytes >= hw { + // read task is paused because of read back-pressure + // but there is no new data in top most read buffer + // so we need to wake up read task to read more data + // otherwise read task would sleep forever + inner.read_task.wake(); + } + } + log::trace!( + "{}: New {} bytes available, wakeup dispatcher", + self.0.tag(), + nbytes + ); + inner.dispatch_task.wake(); + } else { + if nbytes >= hw { + // read task is paused because of read back-pressure + // but there is no new data in top most read buffer + // so we need to wake up read task to read more data + // otherwise read task would sleep forever + inner.read_task.wake(); + } + if inner.flags.get().is_waiting_for_read() { + // in case of "notify" we must wake up dispatch task + // if we read any data from source + inner.dispatch_task.wake(); + } + } + + // while reading, filter wrote some data + // in that case filters need to process write buffers + // and potentialy wake write task + if status.need_write { + inner.write_task.wake(); + filter.process_write_buf(&self.0, &inner.buffer, 0) + } else { + Ok(()) + } + }); + + if let Err(err) = res { + inner.io_stopped(Some(err)); + Poll::Ready(()) + } else { + self.shutdown_filters(); + Poll::Pending + } + } + Err(e) => { + inner.io_stopped(Some(e)); + Poll::Ready(()) + } + } + } + + /// Get write buffer + pub fn get_write_buf(&self) -> Poll { + let inner = &self.0 .0; + + // check write readiness + if let Some(waker) = inner.write_task.take() { + let ready = self + .0 + .filter() + .poll_write_ready(&mut Context::from_waker(&waker)); + let buf = if matches!( + ready, + Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown) + ) { + inner.buffer.get_write_destination().and_then(|buf| { + if buf.is_empty() { + None + } else { + Some(buf) + } + }) + } else { + None + }; + + if let Some(buf) = buf { + return Poll::Ready(buf); + } + } + Poll::Pending + } + + pub fn release_write_buf(&self, mut buf: BytesVec) { + let inner = &self.0 .0; + + if let Some(b) = inner.buffer.get_write_destination() { + buf.extend_from_slice(&b); + self.0.memory_pool().release_write_buf(b); + } + inner.buffer.set_write_destination(buf); + + // if write buffer is smaller than high watermark value, turn off back-pressure + let len = inner.buffer.write_destination_size(); + let mut flags = inner.flags.get(); + + if len == 0 { + if flags.is_waiting_for_write() { + flags.waiting_for_write_is_done(); + inner.dispatch_task.wake(); + } + flags.insert(Flags::WR_PAUSED); + inner.flags.set(flags); + } else if flags.contains(Flags::BUF_W_BACKPRESSURE) + && len < inner.pool.get().write_params_high() << 1 + { + flags.remove(Flags::BUF_W_BACKPRESSURE); + inner.flags.set(flags); + inner.dispatch_task.wake(); + } + inner.flags.set(flags); + } + + /// Set write buffer + pub fn set_write_buf(&self, result: io::Result, mut buf: BytesVec) -> Poll<()> { + let result = match result { + Ok(0) => { + log::trace!("{}: Disconnected during flush", self.tag()); + Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + )) + } + Ok(n) => { + if n == buf.len() { + buf.clear(); + Ok(0) + } else { + buf.advance(n); + Ok(buf.len()) + } + } + Err(e) => Err(e), + }; + + let inner = &self.0 .0; + + // set buffer back + let result = match result { + Ok(0) => { + // log::debug!("{}: WROTE ALL {:?}", self.0.tag(), inner.buffer.write_destination_size()); + self.0.memory_pool().release_write_buf(buf); + Ok(inner.buffer.write_destination_size()) + } + Ok(_) => { + if let Some(b) = inner.buffer.get_write_destination() { + buf.extend_from_slice(&b); + self.0.memory_pool().release_write_buf(b); + } + let l = buf.len(); + // log::debug!("{}: WROTE SOME {:?}", self.0.tag(), l); + inner.buffer.set_write_destination(buf); + Ok(l) + } + Err(e) => Err(e), + }; + + let mut flags = inner.flags.get(); + match result { + Ok(0) => { + // all data has been written + flags.insert(Flags::WR_PAUSED); + + if flags.is_task_waiting_for_write() { + flags.task_waiting_for_write_is_done(); + inner.write_task.wake(); + } + + if flags.is_waiting_for_write() { + flags.waiting_for_write_is_done(); + inner.dispatch_task.wake(); + } + inner.flags.set(flags); + Poll::Ready(()) + } + Ok(len) => { + // if write buffer is smaller than high watermark value, turn off back-pressure + if flags.contains(Flags::BUF_W_BACKPRESSURE) + && len < inner.pool.get().write_params_high() << 1 + { + flags.remove(Flags::BUF_W_BACKPRESSURE); + inner.flags.set(flags); + inner.dispatch_task.wake(); + } + Poll::Pending + } + Err(e) => { + inner.io_stopped(Some(e)); + Poll::Ready(()) + } + } + } + /// Get read buffer pub fn with_read_buf(&self, f: F) -> Poll<()> where @@ -509,7 +783,7 @@ impl IoContext { // otherwise read task would sleep forever inner.read_task.wake(); } - if inner.flags.get().contains(Flags::RD_NOTIFY) { + if inner.flags.get().is_waiting_for_read() { // in case of "notify" we must wake up dispatch task // if we read any data from source inner.dispatch_task.wake(); diff --git a/ntex-neon/src/driver/driver_type.rs b/ntex-neon/src/driver/driver_type.rs index 61d39aa2..3738bb9d 100644 --- a/ntex-neon/src/driver/driver_type.rs +++ b/ntex-neon/src/driver/driver_type.rs @@ -54,13 +54,6 @@ impl DriverType { OpenAt::CODE, Close::CODE, Shutdown::CODE, - // Linux kernel 5.19 - #[cfg(any( - feature = "io-uring-sqe128", - feature = "io-uring-cqe32", - feature = "io-uring-socket" - ))] - Socket::CODE, ]; (|| { diff --git a/ntex-neon/src/driver/key.rs b/ntex-neon/src/driver/key.rs index 9d7be808..97b7495e 100644 --- a/ntex-neon/src/driver/key.rs +++ b/ntex-neon/src/driver/key.rs @@ -160,14 +160,6 @@ impl Key { this.cancelled } - pub(crate) fn set_flags(&mut self, flags: u32) { - self.as_opaque_mut().flags = flags; - } - - pub(crate) fn flags(&self) -> u32 { - self.as_opaque().flags - } - /// Whether the op is completed. pub(crate) fn has_result(&self) -> bool { self.as_opaque().result.is_ready() diff --git a/ntex-neon/src/driver/mod.rs b/ntex-neon/src/driver/mod.rs index dea1814d..346a7305 100644 --- a/ntex-neon/src/driver/mod.rs +++ b/ntex-neon/src/driver/mod.rs @@ -37,33 +37,15 @@ pub use asyncify::*; mod driver_type; pub use driver_type::*; -thread_local! { - static LOGGING: std::cell::Cell = const { std::cell::Cell::new(false) }; -} - -/// enable logging for thread -pub fn enable_logging() { - LOGGING.with(|v| v.set(true)); -} - -/// enable logging for thread -pub fn log>(s: T) { - LOGGING.with(|_v| { - //if _v.get() { - println!("{}", s.as_ref()); - //} - }); -} - cfg_if::cfg_if! { //if #[cfg(windows)] { // #[path = "iocp/mod.rs"] // mod sys; - //} else if #[cfg(all(target_os = "linux", feature = "io-uring"))] { - // #[path = "iour/mod.rs"] - // mod sys; //} else - if #[cfg(unix)] { + if #[cfg(all(target_os = "linux", feature = "io-uring"))] { + #[path = "uring/mod.rs"] + mod sys; + } else if #[cfg(unix)] { #[path = "poll/mod.rs"] mod sys; } @@ -248,17 +230,6 @@ impl Proactor { }) } - /// Attach an fd to the driver. - /// - /// ## Platform specific - /// * IOCP: it will be attached to the completion port. An fd could only be - /// attached to one driver, and could only be attached once, even if you - /// `try_clone` it. - /// * io-uring & polling: it will do nothing but return `Ok(())`. - pub fn attach(&self, fd: RawFd) -> io::Result<()> { - self.driver.attach(fd) - } - /// Cancel an operation with the pushed user-defined data. /// /// The cancellation is not reliable. The underlying operation may continue, @@ -305,11 +276,10 @@ impl Proactor { /// # Panics /// This function will panic if the requested operation has not been /// completed. - pub fn pop(&self, op: Key) -> PushEntry, ((io::Result, T), u32)> { + pub fn pop(&self, op: Key) -> PushEntry, (io::Result, T)> { if op.has_result() { - let flags = op.flags(); // SAFETY: completed. - PushEntry::Ready((unsafe { op.into_inner() }, flags)) + PushEntry::Ready(unsafe { op.into_inner() }) } else { PushEntry::Pending(op) } @@ -327,7 +297,7 @@ impl Proactor { pub fn register_handler(&self, f: F) where - F: FnOnce(DriverApi) -> Box, + F: FnOnce(DriverApi) -> Box, { self.driver.register_handler(f) } @@ -346,23 +316,12 @@ impl AsRawFd for Proactor { pub(crate) struct Entry { user_data: usize, result: io::Result, - flags: u32, } #[cfg(unix)] impl Entry { pub(crate) fn new(user_data: usize, result: io::Result) -> Self { - Self { - user_data, - result, - flags: 0, - } - } - - #[cfg(all(target_os = "linux", feature = "io-uring"))] - // this method only used by in io-uring driver - pub(crate) fn set_flags(&mut self, flags: u32) { - self.flags = flags; + Self { user_data, result } } /// The user-defined data returned by [`Proactor::push`]. @@ -370,10 +329,6 @@ impl Entry { self.user_data } - pub fn flags(&self) -> u32 { - self.flags - } - /// The result of the operation. pub fn into_result(self) -> io::Result { self.result @@ -383,7 +338,6 @@ impl Entry { pub unsafe fn notify(self) { let user_data = self.user_data(); let mut op = Key::<()>::new_unchecked(user_data); - op.set_flags(self.flags()); if op.set_result(self.into_result()) { // SAFETY: completed and cancelled. let _ = op.into_box(); @@ -428,7 +382,6 @@ impl ThreadPoolBuilder { pub struct ProactorBuilder { capacity: u32, pool_builder: ThreadPoolBuilder, - sqpoll_idle: Option, } #[cfg(unix)] @@ -445,7 +398,6 @@ impl ProactorBuilder { Self { capacity: 1024, pool_builder: ThreadPoolBuilder::new(), - sqpoll_idle: None, } } @@ -496,19 +448,6 @@ impl ProactorBuilder { self.pool_builder.create_or_reuse() } - /// Set `io-uring` sqpoll idle milliseconds, when `sqpoll_idle` is set, - /// io-uring sqpoll feature will be enabled - /// - /// # Notes - /// - /// - Only effective when the `io-uring` feature is enabled - /// - `idle` must >= 1ms, otherwise will set sqpoll idle 0ms - /// - `idle` will be rounded down - pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self { - self.sqpoll_idle = Some(idle); - self - } - /// Build the [`Proactor`]. pub fn build(&self) -> io::Result { Proactor::with_builder(self) diff --git a/ntex-neon/src/driver/op.rs b/ntex-neon/src/driver/op.rs index 0296960c..65769550 100644 --- a/ntex-neon/src/driver/op.rs +++ b/ntex-neon/src/driver/op.rs @@ -4,26 +4,10 @@ //! The operation itself doesn't perform anything. //! You need to pass them to [`crate::Proactor`], and poll the driver. -use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown}; +use std::{marker::PhantomPinned, net::Shutdown}; #[cfg(unix)] -pub use super::sys::op::{CreateSocket, Interest}; - -use super::OwnedFd; - -pub trait Handler { - /// Submitted interest - fn readable(&mut self, id: usize); - - /// Submitted interest - fn writable(&mut self, id: usize); - - /// Operation submission has failed - fn error(&mut self, id: usize, err: io::Error); - - /// All events are processed, process all updates - fn commit(&mut self); -} +pub use super::sys::op::*; /// Spawn a blocking function in the thread pool. pub struct Asyncify { @@ -59,17 +43,3 @@ impl ShutdownSocket { Self { fd, how } } } - -/// Close socket fd. -pub struct CloseSocket { - pub(crate) fd: ManuallyDrop, -} - -impl CloseSocket { - /// Create [`CloseSocket`]. - pub fn new(fd: OwnedFd) -> Self { - Self { - fd: ManuallyDrop::new(fd), - } - } -} diff --git a/ntex-neon/src/driver/poll/mod.rs b/ntex-neon/src/driver/poll/mod.rs index 07866270..39162a14 100644 --- a/ntex-neon/src/driver/poll/mod.rs +++ b/ntex-neon/src/driver/poll/mod.rs @@ -8,9 +8,7 @@ use crossbeam_queue::SegQueue; use nohash_hasher::IntMap; use polling::{Event, Events, Poller}; -use crate::driver::{ - op::Handler, op::Interest, sys, AsyncifyPool, Entry, Key, ProactorBuilder, -}; +use crate::driver::{op::Interest, sys, AsyncifyPool, Entry, Key, ProactorBuilder}; pub(crate) mod op; @@ -187,9 +185,10 @@ pub(crate) struct Driver { registry: RefCell>, pool: AsyncifyPool, pool_completed: Arc>, + hid: Cell, changes: Rc>>, - handlers: Cell>>>>, + handlers: Cell>>>>, } impl Driver { @@ -216,7 +215,7 @@ impl Driver { pub fn register_handler(&self, f: F) where - F: FnOnce(DriverApi) -> Box, + F: FnOnce(DriverApi) -> Box, { let id = self.hid.get(); let mut handlers = self.handlers.take().unwrap_or_default(); @@ -234,10 +233,6 @@ impl Driver { Key::new(self.as_raw_fd(), op) } - pub fn attach(&self, _fd: RawFd) -> io::Result<()> { - Ok(()) - } - pub fn push(&self, op: &mut Key) -> Poll> { let user_data = op.user_data(); let op_pin = op.as_op_pin(); diff --git a/ntex-neon/src/driver/poll/op.rs b/ntex-neon/src/driver/poll/op.rs index 686d70e2..7b5ad728 100644 --- a/ntex-neon/src/driver/poll/op.rs +++ b/ntex-neon/src/driver/poll/op.rs @@ -1,10 +1,24 @@ -use std::{io, marker::Send, os::fd::FromRawFd, os::fd::RawFd, pin::Pin, task::Poll}; +use std::{io, marker::Send, mem, os::fd::FromRawFd, os::fd::RawFd, pin::Pin, task::Poll}; pub use crate::driver::unix::op::*; -use super::{AsRawFd, Decision, OpCode}; +use super::{AsRawFd, Decision, OpCode, OwnedFd}; use crate::{driver::op::*, syscall}; +pub trait Handler { + /// Submitted interest + fn readable(&mut self, id: usize); + + /// Submitted interest + fn writable(&mut self, id: usize); + + /// Operation submission has failed + fn error(&mut self, id: usize, err: io::Error); + + /// All events are processed, process all updates + fn commit(&mut self); +} + impl OpCode for Asyncify where D: Send + 'static, @@ -27,6 +41,20 @@ where } } +/// Close socket fd. +pub struct CloseSocket { + pub(crate) fd: mem::ManuallyDrop, +} + +impl CloseSocket { + /// Create [`CloseSocket`]. + pub fn new(fd: OwnedFd) -> Self { + Self { + fd: mem::ManuallyDrop::new(fd), + } + } +} + impl OpCode for CreateSocket { fn pre_submit(self: Pin<&mut Self>) -> io::Result { Ok(Decision::Blocking) diff --git a/ntex-neon/src/driver/uring/mod.rs b/ntex-neon/src/driver/uring/mod.rs new file mode 100644 index 00000000..cd3c8354 --- /dev/null +++ b/ntex-neon/src/driver/uring/mod.rs @@ -0,0 +1,340 @@ +pub use std::os::fd::{AsRawFd, OwnedFd, RawFd}; + +use std::cell::{Cell, RefCell}; +use std::{ + collections::VecDeque, io, pin::Pin, rc::Rc, sync::Arc, task::Poll, time::Duration, +}; + +use crossbeam_queue::SegQueue; +use io_uring::cqueue::{more, Entry as CEntry}; +use io_uring::opcode::{AsyncCancel, PollAdd}; +use io_uring::squeue::Entry as SEntry; +use io_uring::types::{Fd, SubmitArgs, Timespec}; +use io_uring::IoUring; + +mod notify; +pub(crate) mod op; + +pub use self::notify::NotifyHandle; + +use self::notify::Notifier; +use crate::driver::{sys, AsyncifyPool, Entry, Key, ProactorBuilder}; + +/// Abstraction of io-uring operations. +pub trait OpCode { + /// Name of the operation + fn name(&self) -> &'static str; + + /// Call the operation in a blocking way. This method will only be called if + /// [`create_entry`] returns [`OpEntry::Blocking`]. + fn call_blocking(self: Pin<&mut Self>) -> io::Result { + unreachable!("this operation is asynchronous") + } + + /// Set the result when it successfully completes. + /// The operation stores the result and is responsible to release it if the + /// operation is cancelled. + /// + /// # Safety + /// + /// Users should not call it. + unsafe fn set_result(self: Pin<&mut Self>, _: usize) {} +} + +#[derive(Debug)] +enum Change { + Submit { entry: SEntry }, + Cancel { op_id: u64 }, +} + +pub struct DriverApi { + batch: u64, + changes: Rc>>, +} + +impl DriverApi { + pub fn submit(&self, user_data: u32, entry: SEntry) { + log::debug!( + "Submit operation batch: {:?} user-data: {:?} entry: {:?}", + self.batch >> Driver::BATCH, + user_data, + entry, + ); + self.changes.borrow_mut().push_back(Change::Submit { + entry: entry.user_data(user_data as u64 | self.batch), + }); + } + + pub fn cancel(&self, op_id: u32) { + log::debug!( + "Cancel operation batch: {:?} user-data: {:?}", + self.batch >> Driver::BATCH, + op_id + ); + self.changes.borrow_mut().push_back(Change::Cancel { + op_id: op_id as u64 | self.batch, + }); + } +} + +/// Low-level driver of io-uring. +pub(crate) struct Driver { + ring: RefCell>, + notifier: Notifier, + pool: AsyncifyPool, + pool_completed: Arc>, + + hid: Cell, + changes: Rc>>, + handlers: Cell>>>>, +} + +impl Driver { + const NOTIFY: u64 = u64::MAX; + const CANCEL: u64 = u64::MAX - 1; + const BATCH: u64 = 48; + const BATCH_MASK: u64 = 0xFFFF_0000_0000_0000; + const DATA_MASK: u64 = 0x0000_FFFF_FFFF_FFFF; + + pub fn new(builder: &ProactorBuilder) -> io::Result { + log::trace!("New io-uring driver"); + + let mut ring = IoUring::builder() + .setup_coop_taskrun() + .setup_single_issuer() + .build(builder.capacity)?; + + let notifier = Notifier::new()?; + + #[allow(clippy::useless_conversion)] + unsafe { + ring.submission() + .push( + &PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _) + .multi(true) + .build() + .user_data(Self::NOTIFY) + .into(), + ) + .expect("the squeue sould not be full"); + } + Ok(Self { + notifier, + ring: RefCell::new(ring), + pool: builder.create_or_get_thread_pool(), + pool_completed: Arc::new(SegQueue::new()), + + hid: Cell::new(0), + changes: Rc::new(RefCell::new(VecDeque::new())), + handlers: Cell::new(Some(Box::new(Vec::new()))), + }) + } + + pub fn register_handler(&self, f: F) + where + F: FnOnce(DriverApi) -> Box, + { + let id = self.hid.get(); + let mut handlers = self.handlers.take().unwrap_or_default(); + + let api = DriverApi { + batch: id << 48, + changes: self.changes.clone(), + }; + handlers.push(f(api)); + self.hid.set(id + 1); + self.handlers.set(Some(handlers)); + } + + // Auto means that it choose to wait or not automatically. + fn submit_auto(&self, timeout: Option) -> io::Result<()> { + let mut ring = self.ring.borrow_mut(); + let res = { + // Last part of submission queue, wait till timeout. + if let Some(duration) = timeout { + let timespec = timespec(duration); + let args = SubmitArgs::new().timespec(×pec); + ring.submitter().submit_with_args(1, &args) + } else { + ring.submit_and_wait(1) + } + }; + match res { + Ok(_) => { + // log::debug!("Submit result: {res:?} {:?}", timeout); + if ring.completion().is_empty() { + Err(io::ErrorKind::TimedOut.into()) + } else { + Ok(()) + } + } + Err(e) => match e.raw_os_error() { + Some(libc::ETIME) => { + if timeout.is_some() && timeout != Some(Duration::ZERO) { + Err(io::ErrorKind::TimedOut.into()) + } else { + Ok(()) + } + } + Some(libc::EBUSY) | Some(libc::EAGAIN) => { + Err(io::ErrorKind::Interrupted.into()) + } + _ => Err(e), + }, + } + } + + pub fn create_op(&self, op: T) -> Key { + Key::new(self.as_raw_fd(), op) + } + + fn apply_changes(&self) -> bool { + let mut changes = self.changes.borrow_mut(); + if changes.is_empty() { + return false; + } + log::debug!("Apply changes, {:?}", changes.len()); + + let mut ring = self.ring.borrow_mut(); + let mut squeue = ring.submission(); + + while let Some(change) = changes.pop_front() { + match change { + Change::Submit { entry } => { + if unsafe { squeue.push(&entry) }.is_err() { + changes.push_front(Change::Submit { entry }); + break; + } + } + Change::Cancel { op_id } => { + let entry = AsyncCancel::new(op_id).build().user_data(Self::CANCEL); + if unsafe { squeue.push(&entry) }.is_err() { + changes.push_front(Change::Cancel { op_id }); + break; + } + } + } + } + squeue.sync(); + + !changes.is_empty() + } + + pub unsafe fn poll( + &self, + timeout: Option, + f: F, + ) -> io::Result<()> { + self.poll_blocking(); + + let has_more = self.apply_changes(); + let poll_result = self.poll_completions(); + + if !poll_result || has_more { + if has_more { + self.submit_auto(Some(Duration::ZERO))?; + } else { + self.submit_auto(timeout)?; + } + self.poll_completions(); + } + + f(); + + Ok(()) + } + + pub fn push(&self, op: &mut Key) -> Poll> { + log::trace!("Push op: {:?}", op.as_op_pin().name()); + + let user_data = op.user_data(); + loop { + if self.push_blocking(user_data) { + break Poll::Pending; + } else { + self.poll_blocking(); + } + } + } + + fn poll_completions(&self) -> bool { + let mut ring = self.ring.borrow_mut(); + let mut cqueue = ring.completion(); + cqueue.sync(); + let has_entry = !cqueue.is_empty(); + if !has_entry { + return false; + } + let mut handlers = self.handlers.take().unwrap(); + for entry in cqueue { + let user_data = entry.user_data(); + match user_data { + Self::CANCEL => {} + Self::NOTIFY => { + let flags = entry.flags(); + debug_assert!(more(flags)); + self.notifier.clear().expect("cannot clear notifier"); + } + _ => { + let batch = ((user_data & Self::BATCH_MASK) >> Self::BATCH) as usize; + let user_data = (user_data & Self::DATA_MASK) as usize; + + let result = entry.result(); + + if result == -libc::ECANCELED { + handlers[batch].canceled(user_data); + } else { + let result = if result < 0 { + Err(io::Error::from_raw_os_error(result)) + } else { + Ok(result as _) + }; + handlers[batch].completed(user_data, entry.flags(), result); + } + } + } + } + self.handlers.set(Some(handlers)); + true + } + + fn poll_blocking(&self) { + if !self.pool_completed.is_empty() { + while let Some(entry) = self.pool_completed.pop() { + unsafe { + entry.notify(); + } + } + } + } + + fn push_blocking(&self, user_data: usize) -> bool { + let handle = self.handle(); + let completed = self.pool_completed.clone(); + self.pool + .dispatch(move || { + let mut op = unsafe { Key::::new_unchecked(user_data) }; + let op_pin = op.as_op_pin(); + let res = op_pin.call_blocking(); + completed.push(Entry::new(user_data, res)); + handle.notify().ok(); + }) + .is_ok() + } + + pub fn handle(&self) -> NotifyHandle { + self.notifier.handle() + } +} + +impl AsRawFd for Driver { + fn as_raw_fd(&self) -> RawFd { + self.ring.borrow().as_raw_fd() + } +} + +fn timespec(duration: std::time::Duration) -> Timespec { + Timespec::new() + .sec(duration.as_secs()) + .nsec(duration.subsec_nanos()) +} diff --git a/ntex-neon/src/driver/uring/notify.rs b/ntex-neon/src/driver/uring/notify.rs new file mode 100644 index 00000000..faa9cae0 --- /dev/null +++ b/ntex-neon/src/driver/uring/notify.rs @@ -0,0 +1,73 @@ +use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}; +use std::{io, mem, sync::Arc}; + +use crate::syscall; + +#[derive(Debug)] +pub(crate) struct Notifier { + fd: Arc, +} + +impl Notifier { + /// Create a new notifier. + pub(crate) fn new() -> io::Result { + let fd = syscall!(libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?; + let fd = unsafe { OwnedFd::from_raw_fd(fd) }; + Ok(Self { fd: Arc::new(fd) }) + } + + pub(crate) fn clear(&self) -> io::Result<()> { + loop { + let mut buffer = [0u64]; + let res = syscall!(libc::read( + self.fd.as_raw_fd(), + buffer.as_mut_ptr().cast(), + mem::size_of::() + )); + match res { + Ok(len) => { + debug_assert_eq!(len, mem::size_of::() as _); + break Ok(()); + } + // Clear the next time:) + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(()), + // Just like read_exact + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => break Err(e), + } + } + } + + pub(crate) fn handle(&self) -> NotifyHandle { + NotifyHandle::new(self.fd.clone()) + } +} + +impl AsRawFd for Notifier { + fn as_raw_fd(&self) -> RawFd { + self.fd.as_raw_fd() + } +} + +#[derive(Clone)] +/// A notify handle to the inner driver. +pub struct NotifyHandle { + fd: Arc, +} + +impl NotifyHandle { + pub(crate) fn new(fd: Arc) -> Self { + Self { fd } + } + + /// Notify the inner driver. + pub fn notify(&self) -> io::Result<()> { + let data = 1u64; + syscall!(libc::write( + self.fd.as_raw_fd(), + &data as *const _ as *const _, + std::mem::size_of::(), + ))?; + Ok(()) + } +} diff --git a/ntex-neon/src/driver/uring/op.rs b/ntex-neon/src/driver/uring/op.rs new file mode 100644 index 00000000..e4256af4 --- /dev/null +++ b/ntex-neon/src/driver/uring/op.rs @@ -0,0 +1,55 @@ +use std::{io, os::fd::AsRawFd, pin::Pin}; + +pub use crate::driver::unix::op::*; + +use super::OpCode; +use crate::{driver::op::*, syscall}; + +pub trait Handler { + /// Operation is completed + fn completed(&mut self, user_data: usize, flags: u32, result: io::Result); + + fn canceled(&mut self, user_data: usize); +} + +impl OpCode for Asyncify +where + D: Send + 'static, + F: (FnOnce() -> (io::Result, D)) + Send + 'static, +{ + fn name(&self) -> &'static str { + "Asyncify" + } + + fn call_blocking(self: Pin<&mut Self>) -> std::io::Result { + // Safety: self won't be moved + let this = unsafe { self.get_unchecked_mut() }; + let f = this + .f + .take() + .expect("the operate method could only be called once"); + let (res, data) = f(); + this.data = Some(data); + res + } +} + +impl OpCode for CreateSocket { + fn name(&self) -> &'static str { + "CreateSocket" + } + + fn call_blocking(self: Pin<&mut Self>) -> io::Result { + Ok(syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _) + } +} + +impl OpCode for ShutdownSocket { + fn name(&self) -> &'static str { + "ShutdownSocket" + } + + fn call_blocking(self: Pin<&mut Self>) -> io::Result { + Ok(syscall!(libc::shutdown(self.fd.as_raw_fd(), self.how()))? as _) + } +} diff --git a/ntex-neon/src/net/socket.rs b/ntex-neon/src/net/socket.rs index 6653dbea..09be0200 100644 --- a/ntex-neon/src/net/socket.rs +++ b/ntex-neon/src/net/socket.rs @@ -1,10 +1,9 @@ #![allow(clippy::missing_safety_doc)] -use std::{future::Future, io, mem, mem::MaybeUninit}; +use std::{io, mem::MaybeUninit}; use socket2::{Domain, Protocol, SockAddr, Socket as Socket2, Type}; -use crate::driver::{op::CloseSocket, op::ShutdownSocket, AsRawFd}; -use crate::{impl_raw_fd, syscall}; +use crate::{driver::AsRawFd, impl_raw_fd, syscall}; #[derive(Debug)] pub struct Socket { @@ -120,22 +119,6 @@ impl Socket { Ok(socket) } - pub fn close(self) -> impl Future> { - let op = CloseSocket::from_raw_fd(self.as_raw_fd()); - let fut = crate::submit(op); - mem::forget(self); - async move { - fut.await.0?; - Ok(()) - } - } - - pub async fn shutdown(&self) -> io::Result<()> { - let op = ShutdownSocket::new(self.as_raw_fd(), std::net::Shutdown::Write); - crate::submit(op).await.0?; - Ok(()) - } - #[cfg(unix)] pub unsafe fn get_socket_option( &self, diff --git a/ntex-neon/src/net/tcp.rs b/ntex-neon/src/net/tcp.rs index 1d7c613e..84b6c014 100644 --- a/ntex-neon/src/net/tcp.rs +++ b/ntex-neon/src/net/tcp.rs @@ -1,4 +1,4 @@ -use std::{future::Future, io, net::SocketAddr}; +use std::{io, net::SocketAddr}; use socket2::Socket as Socket2; @@ -26,11 +26,6 @@ impl TcpStream { Self { inner } } - /// Close the socket. - pub fn close(self) -> impl Future> { - self.inner.close() - } - /// Returns the socket address of the remote peer of this TCP connection. pub fn peer_addr(&self) -> io::Result { self.inner diff --git a/ntex-neon/src/net/unix.rs b/ntex-neon/src/net/unix.rs index 39b155d1..833865d0 100644 --- a/ntex-neon/src/net/unix.rs +++ b/ntex-neon/src/net/unix.rs @@ -1,4 +1,4 @@ -use std::{future::Future, io}; +use std::io; use socket2::{SockAddr, Socket as Socket2}; @@ -27,12 +27,6 @@ impl UnixStream { Self { inner } } - /// Close the socket. If the returned future is dropped before polling, the - /// socket won't be closed. - pub fn close(self) -> impl Future> { - self.inner.close() - } - /// Returns the socket path of the remote peer of this connection. pub fn peer_addr(&self) -> io::Result { #[allow(unused_mut)] diff --git a/ntex-neon/src/op.rs b/ntex-neon/src/op.rs index 9949f8eb..5138ed4c 100644 --- a/ntex-neon/src/op.rs +++ b/ntex-neon/src/op.rs @@ -15,7 +15,7 @@ impl OpFuture { } impl Future for OpFuture { - type Output = ((io::Result, T), u32); + type Output = (io::Result, T); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let res = Runtime::with_current(|r| r.poll_task(cx, self.key.take().unwrap())); diff --git a/ntex-neon/src/rt.rs b/ntex-neon/src/rt.rs index f4a5cb9f..a4197671 100644 --- a/ntex-neon/src/rt.rs +++ b/ntex-neon/src/rt.rs @@ -110,14 +110,6 @@ impl Runtime { } } - /// Attach a raw file descriptor/handle/socket to the runtime. - /// - /// You only need this when authoring your own high-level APIs. High-level - /// resources in this crate are attached automatically. - pub fn attach(&self, fd: RawFd) -> io::Result<()> { - self.driver.attach(fd) - } - /// Block on the future till it completes. pub fn block_on(&self, future: F) -> F::Output { CURRENT_RUNTIME.set(self, || { @@ -176,7 +168,7 @@ impl Runtime { // It is safe to use `submit` here because the task is spawned immediately. unsafe { let fut = self.submit_with_flags(op); - self.spawn_unchecked(async move { fut.await.0 .1.into_inner() }) + self.spawn_unchecked(async move { fut.await.1.into_inner() }) } } @@ -190,17 +182,13 @@ impl Runtime { fn submit_with_flags( &self, op: T, - ) -> impl Future, T), u32)> { + ) -> impl Future, T)> { let fut = self.submit_raw(op); async move { match fut { PushEntry::Pending(user_data) => OpFuture::new(user_data).await, - PushEntry::Ready(res) => { - // submit_flags won't be ready immediately, if ready, it must be error without - // flags - (res, 0) - } + PushEntry::Ready(res) => res, } } } @@ -213,7 +201,7 @@ impl Runtime { &self, cx: &mut Context, op: Key, - ) -> PushEntry, ((io::Result, T), u32)> { + ) -> PushEntry, (io::Result, T)> { self.driver.pop(op).map_pending(|mut k| { self.driver.update_waker(&mut k, cx.waker().clone()); k @@ -415,7 +403,7 @@ pub fn spawn_blocking( /// This method doesn't create runtime. It tries to obtain the current runtime /// by [`Runtime::with_current`]. pub async fn submit(op: T) -> (io::Result, T) { - submit_with_flags(op).await.0 + submit_with_flags(op).await } /// Submit an operation to the current runtime, and return a future for it with @@ -425,8 +413,6 @@ pub async fn submit(op: T) -> (io::Result, T) { /// /// This method doesn't create runtime. It tries to obtain the current runtime /// by [`Runtime::with_current`]. -pub async fn submit_with_flags( - op: T, -) -> ((io::Result, T), u32) { +pub async fn submit_with_flags(op: T) -> (io::Result, T) { Runtime::with_current(|r| r.submit_with_flags(op)).await } diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index 461cc180..5495a535 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,8 +1,8 @@ # Changes -## [2.5.0] - 2025-03-10 +## [2.5.0] - 2025-03-12 -* Add ntex-runtime support +* Add neon runtime support * Drop glommio support diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index c876ba6b..4154fca2 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -24,8 +24,11 @@ tokio = ["ntex-rt/tokio", "ntex-tokio"] # compio runtime compio = ["ntex-rt/compio", "ntex-compio"] -# default ntex runtime -neon = ["ntex-rt/neon", "ntex-neon", "slab", "socket2"] +# neon runtime +neon = ["ntex-rt/neon", "ntex-neon/polling", "slab", "socket2"] + +# neon io-uring runtime +neon-uring = ["ntex-rt/neon", "ntex-neon/io-uring", "io-uring", "slab", "socket2"] [dependencies] ntex-service = "3.3" @@ -46,6 +49,10 @@ thiserror = { workspace = true } slab = { workspace = true, optional = true } socket2 = { workspace = true, optional = true } +# Linux specific dependencies +[target.'cfg(target_os = "linux")'.dependencies] +io-uring = { workspace = true, optional = true } + [dev-dependencies] ntex = "2" env_logger = "0.11" diff --git a/ntex-net/src/compat.rs b/ntex-net/src/compat.rs index a35ce681..83df7232 100644 --- a/ntex-net/src/compat.rs +++ b/ntex-net/src/compat.rs @@ -6,24 +6,53 @@ pub use ntex_tokio::{from_tcp_stream, tcp_connect, tcp_connect_in}; #[cfg(all(unix, feature = "tokio"))] pub use ntex_tokio::{from_unix_stream, unix_connect, unix_connect_in}; -#[cfg(all(feature = "neon", not(feature = "tokio"), not(feature = "compio")))] -pub use crate::rt::{ +#[cfg(all( + feature = "neon", + not(feature = "neon-uring"), + not(feature = "tokio"), + not(feature = "compio") +))] +pub use crate::rt_polling::{ from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect, unix_connect_in, }; -#[cfg(all(feature = "compio", not(feature = "tokio"), not(feature = "neon")))] +#[cfg(all( + feature = "neon-uring", + not(feature = "neon"), + not(feature = "tokio"), + not(feature = "compio"), + target_os = "linux", + feature = "io-uring" +))] +pub use crate::rt_uring::{ + from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect, + unix_connect_in, +}; + +#[cfg(all( + feature = "compio", + not(feature = "tokio"), + not(feature = "neon"), + not(feature = "neon-uring") +))] pub use ntex_compio::{from_tcp_stream, tcp_connect, tcp_connect_in}; #[cfg(all( unix, feature = "compio", not(feature = "tokio"), - not(feature = "neon") + not(feature = "neon"), + not(feature = "neon-uring") ))] pub use ntex_compio::{from_unix_stream, unix_connect, unix_connect_in}; -#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))] +#[cfg(all( + not(feature = "tokio"), + not(feature = "compio"), + not(feature = "neon"), + not(feature = "neon-uring") +))] mod no_rt { use ntex_io::Io; @@ -88,5 +117,10 @@ mod no_rt { } } -#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))] +#[cfg(all( + not(feature = "tokio"), + not(feature = "compio"), + not(feature = "neon"), + not(feature = "neon-uring") +))] pub use no_rt::*; diff --git a/ntex-net/src/lib.rs b/ntex-net/src/lib.rs index e8f5d729..7ae574ab 100644 --- a/ntex-net/src/lib.rs +++ b/ntex-net/src/lib.rs @@ -1,5 +1,6 @@ //! Utility for async runtime abstraction #![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)] +#![allow(unused_variables, dead_code)] mod compat; pub mod connect; @@ -9,5 +10,20 @@ pub use ntex_rt::{spawn, spawn_blocking}; pub use self::compat::*; -#[cfg(all(feature = "neon", not(feature = "tokio"), not(feature = "compio")))] -mod rt; +#[cfg(all( + feature = "neon", + not(feature = "neon-uring"), + not(feature = "tokio"), + not(feature = "compio") +))] +mod rt_polling; + +#[cfg(all( + feature = "neon-uring", + not(feature = "neon"), + not(feature = "tokio"), + not(feature = "compio"), + target_os = "linux", + feature = "io-uring" +))] +mod rt_uring; diff --git a/ntex-net/src/rt/connect.rs b/ntex-net/src/rt_polling/connect.rs similarity index 100% rename from ntex-net/src/rt/connect.rs rename to ntex-net/src/rt_polling/connect.rs diff --git a/ntex-net/src/rt/driver.rs b/ntex-net/src/rt_polling/driver.rs similarity index 77% rename from ntex-net/src/rt/driver.rs rename to ntex-net/src/rt_polling/driver.rs index 5c821f8b..68bb59d5 100644 --- a/ntex-net/src/rt/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,6 +1,6 @@ use std::{cell::Cell, collections::VecDeque, fmt, io, ptr, rc::Rc, task, task::Poll}; -use ntex_neon::driver::op::{Handler, Interest}; +use ntex_neon::driver::op::{CloseSocket, Handler, Interest}; use ntex_neon::driver::{AsRawFd, DriverApi, RawFd}; use ntex_neon::{syscall, Runtime}; use slab::Slab; @@ -19,10 +19,10 @@ bitflags::bitflags! { pub(crate) struct StreamCtl { id: usize, - inner: Rc>, + inner: Rc>, } -struct TcpStreamItem { +struct StreamItem { io: Option, fd: RawFd, context: IoContext, @@ -30,7 +30,7 @@ struct TcpStreamItem { ref_count: usize, } -pub(crate) struct CompioOps(Rc>); +pub(crate) struct StreamOps(Rc>); #[derive(Debug)] enum Change { @@ -39,18 +39,18 @@ enum Change { Error(io::Error), } -struct CompioOpsBatcher { +struct StreamOpsHandler { feed: VecDeque<(usize, Change)>, - inner: Rc>, + inner: Rc>, } -struct CompioOpsInner { +struct StreamOpsInner { api: DriverApi, feed: Cell>>, - streams: Cell>>>>, + streams: Cell>>>>, } -impl CompioOps { +impl StreamOps { pub(crate) fn current() -> Self { Runtime::with_current(|rt| { if let Some(s) = rt.get::() { @@ -58,19 +58,19 @@ impl CompioOps { } else { let mut inner = None; rt.driver().register_handler(|api| { - let ops = Rc::new(CompioOpsInner { + let ops = Rc::new(StreamOpsInner { api, feed: Cell::new(Some(VecDeque::new())), streams: Cell::new(Some(Box::new(Slab::new()))), }); inner = Some(ops.clone()); - Box::new(CompioOpsBatcher { + Box::new(StreamOpsHandler { inner: ops, feed: VecDeque::new(), }) }); - let s = CompioOps(inner.unwrap()); + let s = StreamOps(inner.unwrap()); rt.insert(s.clone()); s } @@ -78,7 +78,7 @@ impl CompioOps { } pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl { - let item = TcpStreamItem { + let item = StreamItem { context, fd: io.as_raw_fd(), io: Some(io), @@ -96,7 +96,7 @@ impl CompioOps { fn with(&self, f: F) -> R where - F: FnOnce(&mut Slab>) -> R, + F: FnOnce(&mut Slab>) -> R, { let mut inner = self.0.streams.take().unwrap(); let result = f(&mut inner); @@ -105,13 +105,13 @@ impl CompioOps { } } -impl Clone for CompioOps { +impl Clone for StreamOps { fn clone(&self) -> Self { Self(self.0.clone()) } } -impl Handler for CompioOpsBatcher { +impl Handler for StreamOpsHandler { fn readable(&mut self, id: usize) { log::debug!("FD is readable {:?}", id); self.feed.push_back((id, Change::Readable)); @@ -149,7 +149,8 @@ impl Handler for CompioOpsBatcher { .inspect(|size| { unsafe { buf.advance_mut(*size) }; log::debug!( - "FD: {:?}, SIZE: {:?}, BUF: {:?}", + "{}: {:?}, SIZE: {:?}, BUF: {:?}", + item.context.tag(), item.fd, size, buf @@ -193,10 +194,11 @@ impl Handler for CompioOpsBatcher { // extra let mut feed = self.inner.feed.take().unwrap(); for id in feed.drain(..) { - log::debug!("Drop io ({}), {:?}", id, streams[id].fd); + let item = &mut streams[id]; + log::debug!("{}: Drop io ({}), {:?}", item.context.tag(), id, item.fd); - streams[id].ref_count -= 1; - if streams[id].ref_count == 0 { + item.ref_count -= 1; + if item.ref_count == 0 { let item = streams.remove(id); if item.io.is_some() { self.inner.api.unregister_all(item.fd); @@ -209,20 +211,17 @@ impl Handler for CompioOpsBatcher { } } -pub(crate) trait Closable { - async fn close(self) -> io::Result<()>; -} - impl StreamCtl { - pub(crate) async fn close(self) -> io::Result<()> - where - T: Closable, - { - if let Some(io) = self.with(|streams| streams[self.id].io.take()) { - io.close().await - } else { - Ok(()) + pub(crate) async fn close(self) -> io::Result<()> { + let (io, fd) = + self.with(|streams| (streams[self.id].io.take(), streams[self.id].fd)); + if let Some(io) = io { + let op = CloseSocket::from_raw_fd(fd); + let fut = ntex_neon::submit(op); + std::mem::forget(io); + fut.await.0?; } + Ok(()) } pub(crate) fn with_io(&self, f: F) -> R @@ -237,7 +236,12 @@ impl StreamCtl { let item = &mut streams[self.id]; if item.flags.intersects(Flags::RD | Flags::WR) { - log::debug!("Pause all io ({}), {:?}", self.id, item.fd); + log::debug!( + "{}: Pause all io ({}), {:?}", + item.context.tag(), + self.id, + item.fd + ); item.flags.remove(Flags::RD | Flags::WR); self.inner.api.unregister_all(item.fd); } @@ -248,7 +252,12 @@ impl StreamCtl { self.with(|streams| { let item = &mut streams[self.id]; - log::debug!("Pause io read ({}), {:?}", self.id, item.fd); + log::debug!( + "{}: Pause io read ({}), {:?}", + item.context.tag(), + self.id, + item.fd + ); if item.flags.contains(Flags::RD) { item.flags.remove(Flags::RD); self.inner.api.unregister(item.fd, Interest::Readable); @@ -260,7 +269,12 @@ impl StreamCtl { self.with(|streams| { let item = &mut streams[self.id]; - log::debug!("Resume io read ({}), {:?}", self.id, item.fd); + log::debug!( + "{}: Resume io read ({}), {:?}", + item.context.tag(), + self.id, + item.fd + ); if !item.flags.contains(Flags::RD) { item.flags.insert(Flags::RD); self.inner @@ -275,9 +289,19 @@ impl StreamCtl { let item = &mut streams[self.id]; if !item.flags.contains(Flags::WR) { - log::debug!("Resume io write ({}), {:?}", self.id, item.fd); + log::debug!( + "{}: Resume io write ({}), {:?}", + item.context.tag(), + self.id, + item.fd + ); let result = item.context.with_write_buf(|buf| { - log::debug!("Writing io ({}), buf: {:?}", self.id, buf.len()); + log::debug!( + "{}: Writing io ({}), buf: {:?}", + item.context.tag(), + self.id, + buf.len() + ); let slice = &buf[..]; syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len())) @@ -285,7 +309,8 @@ impl StreamCtl { if result.is_pending() { log::debug!( - "Write is pending ({}), {:?}", + "{}: Write is pending ({}), {:?}", + item.context.tag(), self.id, item.context.flags() ); @@ -301,7 +326,7 @@ impl StreamCtl { fn with(&self, f: F) -> R where - F: FnOnce(&mut Slab>) -> R, + F: FnOnce(&mut Slab>) -> R, { let mut inner = self.inner.streams.take().unwrap(); let result = f(&mut inner); @@ -325,7 +350,12 @@ impl Clone for StreamCtl { impl Drop for StreamCtl { fn drop(&mut self) { if let Some(mut streams) = self.inner.streams.take() { - log::debug!("Drop io ({}), {:?}", self.id, streams[self.id].fd); + log::debug!( + "{}: Drop io ({}), {:?}", + streams[self.id].context.tag(), + self.id, + streams[self.id].fd + ); streams[self.id].ref_count -= 1; if streams[self.id].ref_count == 0 { diff --git a/ntex-net/src/rt/io.rs b/ntex-net/src/rt_polling/io.rs similarity index 80% rename from ntex-net/src/rt/io.rs rename to ntex-net/src/rt_polling/io.rs index 56305728..e755384f 100644 --- a/ntex-net/src/rt/io.rs +++ b/ntex-net/src/rt_polling/io.rs @@ -1,17 +1,17 @@ -use std::{any, future::poll_fn, io, task::Poll}; +use std::{any, future::poll_fn, task::Poll}; use ntex_io::{ types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, }; -use ntex_neon::{net::TcpStream, net::UnixStream, spawn}; +use ntex_neon::{net::TcpStream, spawn}; -use super::driver::{Closable, CompioOps, StreamCtl}; +use super::driver::{StreamCtl, StreamOps}; impl IoStream for super::TcpStream { fn start(self, read: ReadContext, _: WriteContext) -> Option> { let io = self.0; let context = read.context(); - let ctl = CompioOps::current().register(io, context.clone()); + let ctl = StreamOps::current().register(io, context.clone()); let ctl2 = ctl.clone(); spawn(async move { run(ctl, context).await }).detach(); @@ -23,7 +23,7 @@ impl IoStream for super::UnixStream { fn start(self, read: ReadContext, _: WriteContext) -> Option> { let io = self.0; let context = read.context(); - let ctl = CompioOps::current().register(io, context.clone()); + let ctl = StreamOps::current().register(io, context.clone()); spawn(async move { run(ctl, context).await }).detach(); None @@ -44,25 +44,13 @@ impl Handle for HandleWrapper { } } -impl Closable for TcpStream { - async fn close(self) -> io::Result<()> { - TcpStream::close(self).await - } -} - -impl Closable for UnixStream { - async fn close(self) -> io::Result<()> { - UnixStream::close(self).await - } -} - #[derive(Copy, Clone, Debug, PartialEq, Eq)] enum Status { Shutdown, Terminate, } -async fn run(ctl: StreamCtl, context: IoContext) { +async fn run(ctl: StreamCtl, context: IoContext) { // Handle io read readiness let st = poll_fn(|cx| { let read = match context.poll_read_ready(cx) { diff --git a/ntex-net/src/rt/mod.rs b/ntex-net/src/rt_polling/mod.rs similarity index 92% rename from ntex-net/src/rt/mod.rs rename to ntex-net/src/rt_polling/mod.rs index 956ffccd..abb4e633 100644 --- a/ntex-net/src/rt/mod.rs +++ b/ntex-net/src/rt_polling/mod.rs @@ -1,4 +1,3 @@ -#![allow(clippy::type_complexity)] use std::{io::Result, net, net::SocketAddr}; use ntex_bytes::PoolRef; @@ -8,10 +7,10 @@ mod connect; mod driver; mod io; -/// Tcp stream wrapper for compio TcpStream +/// Tcp stream wrapper for neon TcpStream struct TcpStream(ntex_neon::net::TcpStream); -/// Tcp stream wrapper for compio UnixStream +/// Tcp stream wrapper for neon UnixStream struct UnixStream(ntex_neon::net::UnixStream); /// Opens a TCP connection to a remote host. diff --git a/ntex-net/src/rt_uring/connect.rs b/ntex-net/src/rt_uring/connect.rs new file mode 100644 index 00000000..3b58910e --- /dev/null +++ b/ntex-net/src/rt_uring/connect.rs @@ -0,0 +1,138 @@ +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::{cell::RefCell, io, path::Path, rc::Rc}; + +use io_uring::{opcode, types::Fd}; +use ntex_neon::driver::op::Handler; +use ntex_neon::driver::{AsRawFd, DriverApi, RawFd}; +use ntex_neon::net::{Socket, TcpStream, UnixStream}; +use ntex_neon::Runtime; +use ntex_util::channel::oneshot::{channel, Sender}; +use slab::Slab; +use socket2::{Protocol, SockAddr, Type}; + +pub(crate) async fn connect(addr: SocketAddr) -> io::Result { + let addr = SockAddr::from(addr); + let socket = if cfg!(windows) { + let bind_addr = if addr.is_ipv4() { + SockAddr::from(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)) + } else if addr.is_ipv6() { + SockAddr::from(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0)) + } else { + return Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Unsupported address domain.", + )); + }; + Socket::bind(&bind_addr, Type::STREAM, Some(Protocol::TCP)).await? + } else { + Socket::new(addr.domain(), Type::STREAM, Some(Protocol::TCP)).await? + }; + + let (sender, rx) = channel(); + + ConnectOps::current().connect(socket.as_raw_fd(), addr, sender); + + rx.await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone")) + .and_then(|item| item)?; + + Ok(TcpStream::from_socket(socket)) +} + +pub(crate) async fn connect_unix(path: impl AsRef) -> io::Result { + let addr = SockAddr::unix(path)?; + + #[cfg(windows)] + let socket = { + let new_addr = empty_unix_socket(); + Socket::bind(&new_addr, Type::STREAM, None).await? + }; + #[cfg(unix)] + let socket = { + use socket2::Domain; + Socket::new(Domain::UNIX, Type::STREAM, None).await? + }; + + let (sender, rx) = channel(); + + ConnectOps::current().connect(socket.as_raw_fd(), addr, sender); + + rx.await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone")) + .and_then(|item| item)?; + + Ok(UnixStream::from_socket(socket)) +} + +#[derive(Clone)] +pub(crate) struct ConnectOps(Rc); + +#[derive(Debug)] +enum Change { + Readable, + Writable, + Error(io::Error), +} + +struct ConnectOpsHandler { + inner: Rc, +} + +struct ConnectOpsInner { + api: DriverApi, + ops: RefCell>>>, +} + +impl ConnectOps { + pub(crate) fn current() -> Self { + Runtime::with_current(|rt| { + if let Some(s) = rt.get::() { + s + } else { + let mut inner = None; + rt.driver().register_handler(|api| { + let ops = Rc::new(ConnectOpsInner { + api, + ops: RefCell::new(Slab::new()), + }); + inner = Some(ops.clone()); + Box::new(ConnectOpsHandler { inner: ops }) + }); + + let s = ConnectOps(inner.unwrap()); + rt.insert(s.clone()); + s + } + }) + } + + pub(crate) fn connect( + &self, + fd: RawFd, + addr: SockAddr, + sender: Sender>, + ) -> usize { + let id = self.0.ops.borrow_mut().insert(sender); + self.0.api.submit( + id as u32, + opcode::Connect::new(Fd(fd), addr.as_ptr(), addr.len()).build(), + ); + + id + } +} + +impl Handler for ConnectOpsHandler { + fn canceled(&mut self, user_data: usize) { + log::debug!("Op is canceled {:?}", user_data); + + self.inner.ops.borrow_mut().remove(user_data); + } + + fn completed(&mut self, user_data: usize, flags: u32, result: io::Result) { + log::debug!("Op is completed {:?} result: {:?}", user_data, result); + + let tx = self.inner.ops.borrow_mut().remove(user_data); + let _ = tx.send(result.map(|_| ())); + } +} diff --git a/ntex-net/src/rt_uring/driver.rs b/ntex-net/src/rt_uring/driver.rs new file mode 100644 index 00000000..2dddea43 --- /dev/null +++ b/ntex-net/src/rt_uring/driver.rs @@ -0,0 +1,419 @@ +use std::{cell::RefCell, fmt, io, mem, num::NonZeroU32, rc::Rc, task::Poll}; + +use io_uring::{opcode, squeue::Entry, types::Fd}; +use ntex_neon::driver::op::Handler; +use ntex_neon::driver::{AsRawFd, DriverApi}; +use ntex_neon::Runtime; +use ntex_util::channel::oneshot; +use slab::Slab; + +use ntex_bytes::{Buf, BufMut, BytesVec}; +use ntex_io::IoContext; + +pub(crate) struct StreamCtl { + id: usize, + inner: Rc>, +} + +struct StreamItem { + io: Option, + fd: Fd, + context: IoContext, + ref_count: usize, + rd_op: Option, + wr_op: Option, +} + +enum Operation { + Recv { + id: usize, + buf: BytesVec, + context: IoContext, + }, + Send { + id: usize, + buf: BytesVec, + context: IoContext, + }, + Close { + tx: Option>>, + }, + Nop, +} + +pub(crate) struct StreamOps(Rc>); + +struct StreamOpsHandler { + inner: Rc>, +} + +struct StreamOpsInner { + api: DriverApi, + feed: RefCell>, + storage: RefCell>, +} + +struct StreamOpsStorage { + ops: Slab, + streams: Slab>, +} + +impl StreamOps { + pub(crate) fn current() -> Self { + Runtime::with_current(|rt| { + if let Some(s) = rt.get::() { + s + } else { + let mut inner = None; + rt.driver().register_handler(|api| { + let mut ops = Slab::new(); + ops.insert(Operation::Nop); + + let ops = Rc::new(StreamOpsInner { + api, + feed: RefCell::new(Vec::new()), + storage: RefCell::new(StreamOpsStorage { + ops, + streams: Slab::new(), + }), + }); + inner = Some(ops.clone()); + Box::new(StreamOpsHandler { inner: ops }) + }); + + let s = StreamOps(inner.unwrap()); + rt.insert(s.clone()); + s + } + }) + } + + pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl { + let item = StreamItem { + context, + fd: Fd(io.as_raw_fd()), + io: Some(io), + ref_count: 1, + rd_op: None, + wr_op: None, + }; + let id = self.0.storage.borrow_mut().streams.insert(item); + StreamCtl { + id, + inner: self.0.clone(), + } + } + + fn with(&self, f: F) -> R + where + F: FnOnce(&mut StreamOpsStorage) -> R, + { + f(&mut *self.0.storage.borrow_mut()) + } +} + +impl Clone for StreamOps { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl Handler for StreamOpsHandler { + fn canceled(&mut self, user_data: usize) { + let mut storage = self.inner.storage.borrow_mut(); + + match storage.ops.remove(user_data) { + Operation::Recv { id, buf, context } => { + log::debug!("{}: Recv canceled {:?}", context.tag(), id,); + context.release_read_buf(buf); + } + Operation::Send { id, buf, context } => { + log::debug!("{}: Send canceled: {:?}", context.tag(), id); + context.release_write_buf(buf); + } + Operation::Nop | Operation::Close { .. } => {} + } + } + + fn completed(&mut self, user_data: usize, flags: u32, result: io::Result) { + let mut storage = self.inner.storage.borrow_mut(); + + let op = storage.ops.remove(user_data); + match op { + Operation::Recv { + id, + mut buf, + context, + } => { + let result = result.map(|size| { + unsafe { buf.advance_mut(size as usize) }; + size as usize + }); + + // reset op reference + if let Some(item) = storage.streams.get_mut(id) { + log::debug!( + "{}: Recv completed {:?}, res: {:?}, buf({}): {:?}", + context.tag(), + item.fd, + result, + buf.remaining_mut(), + buf, + ); + item.rd_op.take(); + } + + // set read buf + let tag = context.tag(); + if context.set_read_buf(result, buf).is_pending() { + if let Some((id, op)) = storage.recv(id, Some(context)) { + self.inner.api.submit(id, op); + } + } else { + log::debug!("{}: Recv to pause", tag); + } + } + Operation::Send { id, buf, context } => { + // reset op reference + if let Some(item) = storage.streams.get_mut(id) { + log::debug!( + "{}: Send completed: {:?}, res: {:?}", + context.tag(), + item.fd, + result + ); + item.wr_op.take(); + } + + // set read buf + if context + .set_write_buf(result.map(|size| size as usize), buf) + .is_pending() + { + if let Some((id, op)) = storage.send(id, Some(context)) { + self.inner.api.submit(id, op); + } + } + } + Operation::Close { tx } => { + if let Some(tx) = tx { + let _ = tx.send(result); + } + } + Operation::Nop => {} + } + + // extra + for id in self.inner.feed.borrow_mut().drain(..) { + storage.streams[id].ref_count -= 1; + if storage.streams[id].ref_count == 0 { + let mut item = storage.streams.remove(id); + + log::debug!("{}: Drop io ({}), {:?}", item.context.tag(), id, item.fd); + + if let Some(io) = item.io.take() { + mem::forget(io); + + let id = storage.ops.insert(Operation::Close { tx: None }); + assert!(id < u32::MAX as usize); + self.inner + .api + .submit(id as u32, opcode::Close::new(item.fd).build()); + } + } + } + } +} + +impl StreamOpsStorage { + fn recv(&mut self, id: usize, context: Option) -> Option<(u32, Entry)> { + let item = &mut self.streams[id]; + + if item.rd_op.is_none() { + if let Poll::Ready(mut buf) = item.context.get_read_buf() { + log::debug!( + "{}: Recv resume ({}), {:?} - {:?} = {:?}", + item.context.tag(), + id, + item.fd, + buf, + buf.remaining_mut() + ); + + let slice = buf.chunk_mut(); + let op = opcode::Recv::new(item.fd, slice.as_mut_ptr(), slice.len() as u32) + .build(); + + let op_id = self.ops.insert(Operation::Recv { + id, + buf, + context: context.unwrap_or_else(|| item.context.clone()), + }); + assert!(op_id < u32::MAX as usize); + + item.rd_op = NonZeroU32::new(op_id as u32); + return Some((op_id as u32, op)); + } + } + None + } + + fn send(&mut self, id: usize, context: Option) -> Option<(u32, Entry)> { + let item = &mut self.streams[id]; + + if item.wr_op.is_none() { + if let Poll::Ready(buf) = item.context.get_write_buf() { + log::debug!( + "{}: Send resume ({}), {:?} {:?}", + item.context.tag(), + id, + item.fd, + buf + ); + + let slice = buf.chunk(); + let op = + opcode::Send::new(item.fd, slice.as_ptr(), slice.len() as u32).build(); + + let op_id = self.ops.insert(Operation::Send { + id, + buf, + context: context.unwrap_or_else(|| item.context.clone()), + }); + assert!(op_id < u32::MAX as usize); + + item.wr_op = NonZeroU32::new(op_id as u32); + return Some((op_id as u32, op)); + } + } + None + } +} + +impl StreamCtl { + pub(crate) async fn close(self) -> io::Result<()> { + let result = { + let mut storage = self.inner.storage.borrow_mut(); + + let (io, fd) = { + let item = &mut storage.streams[self.id]; + (item.io.take(), item.fd) + }; + if let Some(io) = io { + mem::forget(io); + + let (tx, rx) = oneshot::channel(); + let id = storage.ops.insert(Operation::Close { tx: Some(tx) }); + assert!(id < u32::MAX as usize); + + drop(storage); + self.inner + .api + .submit(id as u32, opcode::Close::new(fd).build()); + Some(rx) + } else { + None + } + }; + + if let Some(rx) = result { + rx.await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "gone")) + .and_then(|item| item) + .map(|_| ()) + } else { + Ok(()) + } + } + + pub(crate) fn with_io(&self, f: F) -> R + where + F: FnOnce(Option<&T>) -> R, + { + f(self.inner.storage.borrow().streams[self.id].io.as_ref()) + } + + pub(crate) fn resume_read(&self) { + let result = self.inner.storage.borrow_mut().recv(self.id, None); + if let Some((id, op)) = result { + self.inner.api.submit(id, op); + } + } + + pub(crate) fn resume_write(&self) { + let result = self.inner.storage.borrow_mut().send(self.id, None); + if let Some((id, op)) = result { + self.inner.api.submit(id, op); + } + } + + pub(crate) fn pause_read(&self) { + let mut storage = self.inner.storage.borrow_mut(); + let item = &mut storage.streams[self.id]; + + if let Some(rd_op) = item.rd_op { + log::debug!( + "{}: Recv to pause ({}), {:?}", + item.context.tag(), + self.id, + item.fd + ); + self.inner.api.cancel(rd_op.get()); + } + } +} + +impl Clone for StreamCtl { + fn clone(&self) -> Self { + self.inner.storage.borrow_mut().streams[self.id].ref_count += 1; + Self { + id: self.id, + inner: self.inner.clone(), + } + } +} + +impl Drop for StreamCtl { + fn drop(&mut self) { + if let Ok(mut storage) = self.inner.storage.try_borrow_mut() { + storage.streams[self.id].ref_count -= 1; + if storage.streams[self.id].ref_count == 0 { + let mut item = storage.streams.remove(self.id); + if let Some(io) = item.io.take() { + log::debug!( + "{}: Close io ({}), {:?}", + item.context.tag(), + self.id, + item.fd + ); + mem::forget(io); + + let id = storage.ops.insert(Operation::Close { tx: None }); + assert!(id < u32::MAX as usize); + self.inner + .api + .submit(id as u32, opcode::Close::new(item.fd).build()); + } + } + } else { + self.inner.feed.borrow_mut().push(self.id); + } + } +} + +impl PartialEq for StreamCtl { + #[inline] + fn eq(&self, other: &StreamCtl) -> bool { + self.id == other.id && std::ptr::eq(&self.inner, &other.inner) + } +} + +impl fmt::Debug for StreamCtl { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let storage = self.inner.storage.borrow(); + f.debug_struct("StreamCtl") + .field("id", &self.id) + .field("io", &storage.streams[self.id].io) + .finish() + } +} diff --git a/ntex-net/src/rt_uring/io.rs b/ntex-net/src/rt_uring/io.rs new file mode 100644 index 00000000..8457abd3 --- /dev/null +++ b/ntex-net/src/rt_uring/io.rs @@ -0,0 +1,95 @@ +use std::{any, future::poll_fn, task::Poll}; + +use ntex_io::{ + types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, +}; +use ntex_neon::{net::TcpStream, spawn}; + +use super::driver::{StreamCtl, StreamOps}; + +impl IoStream for super::TcpStream { + fn start(self, read: ReadContext, _: WriteContext) -> Option> { + let io = self.0; + let context = read.context(); + let ctl = StreamOps::current().register(io, context.clone()); + let ctl2 = ctl.clone(); + spawn(async move { run(ctl, context).await }).detach(); + + Some(Box::new(HandleWrapper(ctl2))) + } +} + +impl IoStream for super::UnixStream { + fn start(self, read: ReadContext, _: WriteContext) -> Option> { + let io = self.0; + let context = read.context(); + let ctl = StreamOps::current().register(io, context.clone()); + spawn(async move { run(ctl, context).await }).detach(); + + None + } +} + +struct HandleWrapper(StreamCtl); + +impl Handle for HandleWrapper { + fn query(&self, id: any::TypeId) -> Option> { + if id == any::TypeId::of::() { + let addr = self.0.with_io(|io| io.and_then(|io| io.peer_addr().ok())); + if let Some(addr) = addr { + return Some(Box::new(types::PeerAddr(addr))); + } + } + None + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +enum Status { + Shutdown, + Terminate, +} + +async fn run(ctl: StreamCtl, context: IoContext) { + // Handle io readiness + let st = poll_fn(|cx| { + let read = match context.poll_read_ready(cx) { + Poll::Ready(ReadStatus::Ready) => { + ctl.resume_read(); + Poll::Pending + } + Poll::Ready(ReadStatus::Terminate) => Poll::Ready(()), + Poll::Pending => { + ctl.pause_read(); + Poll::Pending + } + }; + + let write = match context.poll_write_ready(cx) { + Poll::Ready(WriteStatus::Ready) => { + log::debug!("{}: write ready", context.tag()); + ctl.resume_write(); + Poll::Pending + } + Poll::Ready(WriteStatus::Shutdown) => Poll::Ready(Status::Shutdown), + Poll::Ready(WriteStatus::Terminate) => Poll::Ready(Status::Terminate), + Poll::Pending => Poll::Pending, + }; + + if read.is_pending() && write.is_pending() { + Poll::Pending + } else if write.is_ready() { + write + } else { + Poll::Ready(Status::Terminate) + } + }) + .await; + + ctl.pause_read(); + ctl.resume_write(); + context.shutdown(st == Status::Shutdown).await; + + let result = ctl.close().await; + context.stopped(result.err()); +} diff --git a/ntex-net/src/rt_uring/mod.rs b/ntex-net/src/rt_uring/mod.rs new file mode 100644 index 00000000..abb4e633 --- /dev/null +++ b/ntex-net/src/rt_uring/mod.rs @@ -0,0 +1,59 @@ +use std::{io::Result, net, net::SocketAddr}; + +use ntex_bytes::PoolRef; +use ntex_io::Io; + +mod connect; +mod driver; +mod io; + +/// Tcp stream wrapper for neon TcpStream +struct TcpStream(ntex_neon::net::TcpStream); + +/// Tcp stream wrapper for neon UnixStream +struct UnixStream(ntex_neon::net::UnixStream); + +/// Opens a TCP connection to a remote host. +pub async fn tcp_connect(addr: SocketAddr) -> Result { + let sock = connect::connect(addr).await?; + Ok(Io::new(TcpStream(sock))) +} + +/// Opens a TCP connection to a remote host and use specified memory pool. +pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result { + let sock = connect::connect(addr).await?; + Ok(Io::with_memory_pool(TcpStream(sock), pool)) +} + +/// Opens a unix stream connection. +pub async fn unix_connect<'a, P>(addr: P) -> Result +where + P: AsRef + 'a, +{ + let sock = connect::connect_unix(addr).await?; + Ok(Io::new(UnixStream(sock))) +} + +/// Opens a unix stream connection and specified memory pool. +pub async fn unix_connect_in<'a, P>(addr: P, pool: PoolRef) -> Result +where + P: AsRef + 'a, +{ + let sock = connect::connect_unix(addr).await?; + Ok(Io::with_memory_pool(UnixStream(sock), pool)) +} + +/// Convert std TcpStream to tokio's TcpStream +pub fn from_tcp_stream(stream: net::TcpStream) -> Result { + stream.set_nodelay(true)?; + Ok(Io::new(TcpStream(ntex_neon::net::TcpStream::from_std( + stream, + )?))) +} + +/// Convert std UnixStream to tokio's UnixStream +pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { + Ok(Io::new(UnixStream(ntex_neon::net::UnixStream::from_std( + stream, + )?))) +} diff --git a/ntex-rt/CHANGES.md b/ntex-rt/CHANGES.md index 1dcb4e05..7617b236 100644 --- a/ntex-rt/CHANGES.md +++ b/ntex-rt/CHANGES.md @@ -1,6 +1,6 @@ # Changes -## [0.4.26] - 2025-03-xx +## [0.4.26] - 2025-03-12 * Add "neon" runtime support diff --git a/ntex-rt/src/arbiter.rs b/ntex-rt/src/arbiter.rs index 1ce2c409..ab804e0d 100644 --- a/ntex-rt/src/arbiter.rs +++ b/ntex-rt/src/arbiter.rs @@ -154,6 +154,7 @@ impl Arbiter { .try_send(ArbiterCommand::Execute(Box::pin(future))); } + #[rustfmt::skip] /// Send a function to the Arbiter's thread. This function will be executed asynchronously. /// A future is created, and when resolved will contain the result of the function sent /// to the Arbiters thread. diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index e642412d..8c3ebf83 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -259,7 +259,7 @@ mod neon { /// completes. pub fn block_on>(fut: F) { log::info!( - "Starting compio runtime, driver {:?}", + "Starting neon runtime, driver {:?}", ntex_neon::driver::DriverType::current() ); let rt = Runtime::new().unwrap(); diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 520d3420..539557c1 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,8 +1,8 @@ # Changes -## [2.12.0] - 2025-03-10 +## [2.12.0] - 2025-03-12 -* Add "ntex-runtime" support +* Add neon runtime support * Drop glommio support diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 8b685f93..a51a9f9c 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -51,6 +51,9 @@ compio = ["ntex-net/compio"] # neon runtime neon = ["ntex-net/neon"] +# neon runtime +neon-uring = ["ntex-net/neon-uring"] + # websocket support ws = ["dep:sha-1"] diff --git a/ntex/tests/server.rs b/ntex/tests/server.rs index 48a06369..8e97908d 100644 --- a/ntex/tests/server.rs +++ b/ntex/tests/server.rs @@ -92,6 +92,7 @@ async fn test_run() { }) }) .unwrap() + .set_tag("test", "SRV") .run(); let _ = tx.send((srv, ntex::rt::System::current())); Ok(())