From 4c1bc3249b3b987ce916e1d9c216bf276447cc95 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 9 Mar 2025 18:11:33 +0500 Subject: [PATCH] Experimental poll based runtime (#510) --- .github/workflows/cov.yml | 7 +- .github/workflows/linux.yml | 5 +- .github/workflows/osx.yml | 4 + Cargo.toml | 30 ++ ntex-io/CHANGES.md | 4 + ntex-io/Cargo.toml | 2 +- ntex-io/src/buf.rs | 21 ++ ntex-io/src/dispatcher.rs | 2 + ntex-io/src/flags.rs | 10 + ntex-io/src/lib.rs | 4 +- ntex-io/src/tasks.rs | 321 ++++++++++++++++++- ntex-iodriver/Cargo.toml | 81 +++++ ntex-iodriver/LICENSE-APACHE | 1 + ntex-iodriver/LICENSE-MIT | 1 + ntex-iodriver/README.md | 13 + ntex-iodriver/src/asyncify.rs | 128 ++++++++ ntex-iodriver/src/driver_type.rs | 115 +++++++ ntex-iodriver/src/key.rs | 223 +++++++++++++ ntex-iodriver/src/lib.rs | 516 +++++++++++++++++++++++++++++++ ntex-iodriver/src/op.rs | 75 +++++ ntex-iodriver/src/poll/mod.rs | 459 +++++++++++++++++++++++++++ ntex-iodriver/src/poll/op.rs | 68 ++++ ntex-iodriver/src/unix/mod.rs | 15 + ntex-iodriver/src/unix/op.rs | 40 +++ ntex-net/Cargo.toml | 20 +- ntex-net/src/compat.rs | 20 ++ ntex-net/src/lib.rs | 9 + ntex-net/src/rt/connect.rs | 196 ++++++++++++ ntex-net/src/rt/driver.rs | 362 ++++++++++++++++++++++ ntex-net/src/rt/io.rs | 107 +++++++ ntex-net/src/rt/mod.rs | 60 ++++ ntex-rt/Cargo.toml | 8 +- ntex-rt/build.rs | 1 + ntex-rt/src/lib.rs | 160 ++++++++++ ntex-runtime/Cargo.toml | 58 ++++ ntex-runtime/src/lib.rs | 13 + ntex-runtime/src/net/mod.rs | 11 + ntex-runtime/src/net/socket.rs | 226 ++++++++++++++ ntex-runtime/src/net/tcp.rs | 50 +++ ntex-runtime/src/net/unix.rs | 98 ++++++ ntex-runtime/src/op.rs | 39 +++ ntex-runtime/src/rt.rs | 431 ++++++++++++++++++++++++++ ntex/Cargo.toml | 11 +- ntex/tests/http_awc_client.rs | 10 +- ntex/tests/server.rs | 5 +- ntex/tests/web_server.rs | 6 +- 46 files changed, 4016 insertions(+), 30 deletions(-) create mode 100644 ntex-iodriver/Cargo.toml create mode 120000 ntex-iodriver/LICENSE-APACHE create mode 120000 ntex-iodriver/LICENSE-MIT create mode 100644 ntex-iodriver/README.md create mode 100644 ntex-iodriver/src/asyncify.rs create mode 100644 ntex-iodriver/src/driver_type.rs create mode 100644 ntex-iodriver/src/key.rs create mode 100644 ntex-iodriver/src/lib.rs create mode 100644 ntex-iodriver/src/op.rs create mode 100644 ntex-iodriver/src/poll/mod.rs create mode 100644 ntex-iodriver/src/poll/op.rs create mode 100644 ntex-iodriver/src/unix/mod.rs create mode 100644 ntex-iodriver/src/unix/op.rs create mode 100644 ntex-net/src/rt/connect.rs create mode 100644 ntex-net/src/rt/driver.rs create mode 100644 ntex-net/src/rt/io.rs create mode 100644 ntex-net/src/rt/mod.rs create mode 100644 ntex-runtime/Cargo.toml create mode 100644 ntex-runtime/src/lib.rs create mode 100644 ntex-runtime/src/net/mod.rs create mode 100644 ntex-runtime/src/net/socket.rs create mode 100644 ntex-runtime/src/net/tcp.rs create mode 100644 ntex-runtime/src/net/unix.rs create mode 100644 ntex-runtime/src/op.rs create mode 100644 ntex-runtime/src/rt.rs diff --git a/.github/workflows/cov.yml b/.github/workflows/cov.yml index be265444..a57edd23 100644 --- a/.github/workflows/cov.yml +++ b/.github/workflows/cov.yml @@ -26,16 +26,15 @@ jobs: - name: Clean coverage results run: cargo llvm-cov clean --workspace - - name: Code coverage (glommio) - continue-on-error: true - run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/glommio,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" -- --skip test_unhandled_data - - name: Code coverage (tokio) run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/tokio,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" - name: Code coverage (compio) run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/compio,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" + - name: Code coverage (default) + run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/default-rt,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" + - name: Generate coverage report run: cargo llvm-cov report --lcov --output-path lcov.info --ignore-filename-regex="ntex-compio|ntex-tokio|ntex-glommio|ntex-async-std" diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 863352e0..619d0cdc 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -54,11 +54,10 @@ jobs: run: | cargo test --all --no-default-features --features="ntex/compio,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" - - name: Run tests (async-std) + - name: Run tests (default) timeout-minutes: 40 - continue-on-error: true run: | - cargo test --all --no-default-features --features="ntex/async-std,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" + cargo test --all --no-default-features --features="ntex/default-rt,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" - name: Install cargo-cache continue-on-error: true diff --git a/.github/workflows/osx.yml b/.github/workflows/osx.yml index 5474c552..c01826de 100644 --- a/.github/workflows/osx.yml +++ b/.github/workflows/osx.yml @@ -44,6 +44,10 @@ jobs: timeout-minutes: 40 run: cargo test --all --no-default-features --no-fail-fast --features="ntex/compio,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" + - name: Run tests (default) + timeout-minutes: 40 + run: cargo test --all --no-default-features --no-fail-fast --features="ntex/default-rt,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" + - name: Install cargo-cache continue-on-error: true run: cargo install cargo-cache --no-default-features --features ci-autoclean diff --git a/Cargo.toml b/Cargo.toml index 13846071..7461076d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,11 @@ members = [ "ntex-bytes", "ntex-codec", "ntex-io", + "ntex-iodriver", "ntex-http", "ntex-router", "ntex-rt", + "ntex-runtime", "ntex-net", "ntex-server", "ntex-service", @@ -21,15 +23,25 @@ members = [ "ntex-tokio", ] +[workspace.package] +authors = ["ntex contributors "] +repository = "https://github.com/ntex-rs/ntex" +documentation = "https://docs.rs/ntex/" +license = "MIT OR Apache-2.0" +edition = "2021" +rust-version = "1.75" + [patch.crates-io] ntex = { path = "ntex" } ntex-bytes = { path = "ntex-bytes" } ntex-codec = { path = "ntex-codec" } ntex-io = { path = "ntex-io" } +ntex-iodriver = { path = "ntex-iodriver" } ntex-net = { path = "ntex-net" } ntex-http = { path = "ntex-http" } ntex-router = { path = "ntex-router" } ntex-rt = { path = "ntex-rt" } +ntex-runtime = { path = "ntex-runtime" } ntex-server = { path = "ntex-server" } ntex-service = { path = "ntex-service" } ntex-tls = { path = "ntex-tls" } @@ -40,3 +52,21 @@ ntex-compio = { path = "ntex-compio" } ntex-glommio = { path = "ntex-glommio" } ntex-tokio = { path = "ntex-tokio" } ntex-async-std = { path = "ntex-async-std" } + +[workspace.dependencies] +async-task = "4.5.0" +bitflags = "2" +cfg_aliases = "0.2.1" +cfg-if = "1.0.0" +crossbeam-channel = "0.5.8" +crossbeam-queue = "0.3.8" +futures-util = "0.3.29" +fxhash = "0.2" +libc = "0.2.164" +log = "0.4" +nohash-hasher = "0.2.0" +scoped-tls = "1.0.1" +slab = "0.4.9" +socket2 = "0.5.6" +windows-sys = "0.52.0" +thiserror = "1" diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 508c0ada..ff7201c9 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.11.0] - 2025-03-10 + +* Add single io context + ## [2.10.0] - 2025-02-26 * Impl Filter for Sealed #506 diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index d8ab4eed..40d4ed20 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.10.0" +version = "2.11.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/buf.rs b/ntex-io/src/buf.rs index fae543bb..e3d701d3 100644 --- a/ntex-io/src/buf.rs +++ b/ntex-io/src/buf.rs @@ -152,6 +152,27 @@ impl Stack { } } + pub(crate) fn with_read_source(&self, io: &IoRef, f: F) -> R + where + F: FnOnce(&mut BytesVec) -> R, + { + let item = self.get_last_level(); + let mut rb = item.0.take(); + if rb.is_none() { + rb = Some(io.memory_pool().get_read_buf()); + } + + let result = f(rb.as_mut().unwrap()); + if let Some(b) = rb { + if b.is_empty() { + io.memory_pool().release_read_buf(b); + } else { + item.0.set(Some(b)); + } + } + result + } + pub(crate) fn with_read_destination(&self, io: &IoRef, f: F) -> R where F: FnOnce(&mut BytesVec) -> R, diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 9efab367..4c03d312 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -1244,6 +1244,8 @@ mod tests { sleep(Millis(50)).await; if let DispatchItem::Item(msg) = msg { Ok::<_, ()>(Some(msg.freeze())) + } else if let DispatchItem::Disconnect(_) = msg { + Ok::<_, ()>(None) } else { panic!() } diff --git a/ntex-io/src/flags.rs b/ntex-io/src/flags.rs index bc9b5aac..565f20c0 100644 --- a/ntex-io/src/flags.rs +++ b/ntex-io/src/flags.rs @@ -25,6 +25,8 @@ bitflags::bitflags! { /// write task paused const WR_PAUSED = 0b0000_0100_0000_0000; + /// wait for write completion task + const WR_TASK_WAIT = 0b0000_1000_0000_0000; /// dispatcher is marked stopped const DSP_STOP = 0b0001_0000_0000_0000; @@ -38,6 +40,10 @@ impl Flags { self.intersects(Flags::IO_STOPPED) } + pub(crate) fn is_task_waiting_for_write(&self) -> bool { + self.contains(Flags::WR_TASK_WAIT) + } + pub(crate) fn is_waiting_for_write(&self) -> bool { self.intersects(Flags::BUF_W_MUST_FLUSH | Flags::BUF_W_BACKPRESSURE) } @@ -46,6 +52,10 @@ impl Flags { self.remove(Flags::BUF_W_MUST_FLUSH | Flags::BUF_W_BACKPRESSURE); } + pub(crate) fn task_waiting_for_write_is_done(&mut self) { + self.remove(Flags::WR_TASK_WAIT); + } + pub(crate) fn is_read_buf_ready(&self) -> bool { self.contains(Flags::BUF_R_READY) } diff --git a/ntex-io/src/lib.rs b/ntex-io/src/lib.rs index cbfde011..6d4b6bdd 100644 --- a/ntex-io/src/lib.rs +++ b/ntex-io/src/lib.rs @@ -29,7 +29,7 @@ pub use self::filter::{Base, Filter, Layer}; pub use self::framed::Framed; pub use self::io::{Io, IoRef, OnDisconnect}; pub use self::seal::{IoBoxed, Sealed}; -pub use self::tasks::{ReadContext, WriteContext, WriteContextBuf}; +pub use self::tasks::{IoContext, ReadContext, WriteContext, WriteContextBuf}; pub use self::timer::TimerHandle; pub use self::utils::{seal, Decoded}; @@ -53,7 +53,9 @@ pub trait AsyncWrite { /// Status for read task #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum ReadStatus { + /// Read task is clear to proceed with read operation Ready, + /// Terminate read task Terminate, } diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 4994595c..ae67263f 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -1,6 +1,6 @@ -use std::{cell::Cell, fmt, future::poll_fn, io, task::Context, task::Poll}; +use std::{cell::Cell, fmt, future::poll_fn, io, task::ready, task::Context, task::Poll}; -use ntex_bytes::{BufMut, BytesVec}; +use ntex_bytes::{Buf, BufMut, BytesVec}; use ntex_util::{future::lazy, future::select, future::Either, time::sleep, time::Sleep}; use crate::{AsyncRead, AsyncWrite, Flags, IoRef, ReadStatus, WriteStatus}; @@ -19,6 +19,13 @@ impl ReadContext { Self(io.clone(), Cell::new(None)) } + #[doc(hidden)] + #[inline] + /// Io tag + pub fn context(&self) -> IoContext { + IoContext::new(&self.0) + } + #[inline] /// Io tag pub fn tag(&self) -> &'static str { @@ -342,3 +349,313 @@ impl WriteContextBuf { } } } + +/// Context for io read task +pub struct IoContext(IoRef); + +impl fmt::Debug for IoContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("IoContext").field("io", &self.0).finish() + } +} + +impl IoContext { + pub(crate) fn new(io: &IoRef) -> Self { + Self(io.clone()) + } + + #[inline] + /// Io tag + pub fn tag(&self) -> &'static str { + self.0.tag() + } + + #[doc(hidden)] + /// Io flags + pub fn flags(&self) -> crate::flags::Flags { + self.0.flags() + } + + #[inline] + /// Check readiness for read operations + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll { + self.shutdown_filters(); + self.0.filter().poll_read_ready(cx) + } + + #[inline] + /// Check readiness for write operations + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll { + self.0.filter().poll_write_ready(cx) + } + + #[inline] + /// Get io error + pub fn stopped(&self, e: Option) { + self.0 .0.io_stopped(e); + } + + /// Wait when io get closed or preparing for close + pub async fn shutdown(&self, flush_buf: bool) { + let st = &self.0 .0; + let mut timeout = None; + + poll_fn(|cx| { + let flags = self.0.flags(); + + if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) { + Poll::Ready(()) + } else { + st.write_task.register(cx.waker()); + if flags.contains(Flags::IO_STOPPING_FILTERS) { + if timeout.is_none() { + timeout = Some(sleep(st.disconnect_timeout.get())); + } + if timeout.as_ref().unwrap().poll_elapsed(cx).is_ready() { + st.dispatch_task.wake(); + st.insert_flags(Flags::IO_STOPPING); + return Poll::Ready(()); + } + } + Poll::Pending + } + }) + .await; + + if flush_buf && !self.0.flags().contains(Flags::WR_PAUSED) { + st.insert_flags(Flags::WR_TASK_WAIT); + + poll_fn(|cx| { + let flags = self.0.flags(); + + if flags.intersects(Flags::WR_PAUSED | Flags::IO_STOPPED) { + Poll::Ready(()) + } else { + st.write_task.register(cx.waker()); + + if timeout.is_none() { + timeout = Some(sleep(st.disconnect_timeout.get())); + } + if timeout.as_ref().unwrap().poll_elapsed(cx).is_ready() { + Poll::Ready(()) + } else { + Poll::Pending + } + } + }) + .await; + } + } + + /// Get read buffer + pub fn with_read_buf(&self, f: F) -> Poll<()> + where + F: FnOnce(&mut BytesVec) -> Poll>, + { + let inner = &self.0 .0; + let (hw, lw) = self.0.memory_pool().read_params().unpack(); + let result = inner.buffer.with_read_source(&self.0, |buf| { + // make sure we've got room + let remaining = buf.remaining_mut(); + if remaining < lw { + buf.reserve(hw - remaining); + } + + f(buf) + }); + + // handle buffer changes + match result { + Poll::Ready(Ok(0)) => { + inner.io_stopped(None); + Poll::Ready(()) + } + Poll::Ready(Ok(nbytes)) => { + let filter = self.0.filter(); + let _ = filter + .process_read_buf(&self.0, &inner.buffer, 0, nbytes) + .and_then(|status| { + if status.nbytes > 0 { + // dest buffer has new data, wake up dispatcher + if inner.buffer.read_destination_size() >= hw { + log::trace!( + "{}: Io read buffer is too large {}, enable read back-pressure", + self.0.tag(), + nbytes + ); + inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL); + } else { + inner.insert_flags(Flags::BUF_R_READY); + + if nbytes >= hw { + // read task is paused because of read back-pressure + // but there is no new data in top most read buffer + // so we need to wake up read task to read more data + // otherwise read task would sleep forever + inner.read_task.wake(); + } + } + log::trace!( + "{}: New {} bytes available, wakeup dispatcher", + self.0.tag(), + nbytes + ); + inner.dispatch_task.wake(); + } else { + if nbytes >= hw { + // read task is paused because of read back-pressure + // but there is no new data in top most read buffer + // so we need to wake up read task to read more data + // otherwise read task would sleep forever + inner.read_task.wake(); + } + if inner.flags.get().contains(Flags::RD_NOTIFY) { + // in case of "notify" we must wake up dispatch task + // if we read any data from source + inner.dispatch_task.wake(); + } + } + + // while reading, filter wrote some data + // in that case filters need to process write buffers + // and potentialy wake write task + if status.need_write { + filter.process_write_buf(&self.0, &inner.buffer, 0) + } else { + Ok(()) + } + }) + .map_err(|err| { + inner.dispatch_task.wake(); + inner.io_stopped(Some(err)); + inner.insert_flags(Flags::BUF_R_READY); + }); + Poll::Pending + } + Poll::Ready(Err(e)) => { + inner.io_stopped(Some(e)); + Poll::Ready(()) + } + Poll::Pending => { + self.shutdown_filters(); + Poll::Pending + } + } + } + + /// Get write buffer + pub fn with_write_buf(&self, f: F) -> Poll<()> + where + F: FnOnce(&BytesVec) -> Poll>, + { + let inner = &self.0 .0; + let result = inner.buffer.with_write_destination(&self.0, |buf| { + let Some(buf) = + buf.and_then(|buf| if buf.is_empty() { None } else { Some(buf) }) + else { + return Poll::Ready(Ok(0)); + }; + + match ready!(f(buf)) { + Ok(0) => { + log::trace!("{}: Disconnected during flush", self.tag()); + Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + ))) + } + Ok(n) => { + if n == buf.len() { + buf.clear(); + Poll::Ready(Ok(0)) + } else { + buf.advance(n); + Poll::Ready(Ok(buf.len())) + } + } + Err(e) => Poll::Ready(Err(e)), + } + }); + + let mut flags = inner.flags.get(); + + let result = match result { + Poll::Pending => { + flags.remove(Flags::WR_PAUSED); + Poll::Pending + } + Poll::Ready(Ok(0)) => { + // all data has been written + flags.insert(Flags::WR_PAUSED); + + if flags.is_task_waiting_for_write() { + flags.task_waiting_for_write_is_done(); + inner.write_task.wake(); + } + + if flags.is_waiting_for_write() { + flags.waiting_for_write_is_done(); + inner.dispatch_task.wake(); + } + Poll::Ready(()) + } + Poll::Ready(Ok(len)) => { + // if write buffer is smaller than high watermark value, turn off back-pressure + if flags.contains(Flags::BUF_W_BACKPRESSURE) + && len < inner.pool.get().write_params_high() << 1 + { + flags.remove(Flags::BUF_W_BACKPRESSURE); + inner.dispatch_task.wake(); + } + Poll::Pending + } + Poll::Ready(Err(e)) => { + self.0 .0.io_stopped(Some(e)); + Poll::Ready(()) + } + }; + + inner.flags.set(flags); + result + } + + fn shutdown_filters(&self) { + let io = &self.0; + let st = &self.0 .0; + if st.flags.get().contains(Flags::IO_STOPPING_FILTERS) { + let flags = st.flags.get(); + + if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) { + let filter = io.filter(); + match filter.shutdown(io, &st.buffer, 0) { + Ok(Poll::Ready(())) => { + st.dispatch_task.wake(); + st.insert_flags(Flags::IO_STOPPING); + } + Ok(Poll::Pending) => { + // check read buffer, if buffer is not consumed it is unlikely + // that filter will properly complete shutdown + if flags.contains(Flags::RD_PAUSED) + || flags.contains(Flags::BUF_R_FULL | Flags::BUF_R_READY) + { + st.dispatch_task.wake(); + st.insert_flags(Flags::IO_STOPPING); + } + } + Err(err) => { + st.io_stopped(Some(err)); + } + } + if let Err(err) = filter.process_write_buf(io, &st.buffer, 0) { + st.io_stopped(Some(err)); + } + } + } + } +} + +impl Clone for IoContext { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} diff --git a/ntex-iodriver/Cargo.toml b/ntex-iodriver/Cargo.toml new file mode 100644 index 00000000..2bd61049 --- /dev/null +++ b/ntex-iodriver/Cargo.toml @@ -0,0 +1,81 @@ +[package] +name = "ntex-iodriver" +version = "0.1.0" +description = "Low-level driver for ntex" +categories = ["asynchronous"] +keywords = ["async", "iocp", "io-uring"] +edition = { workspace = true } +authors = { workspace = true } +readme = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] +default-target = "x86_64-unknown-linux-gnu" +targets = [ + "x86_64-pc-windows-gnu", + "x86_64-unknown-linux-gnu", + "x86_64-apple-darwin", + "aarch64-apple-ios", + "aarch64-linux-android", + "x86_64-unknown-dragonfly", + "x86_64-unknown-freebsd", + "x86_64-unknown-illumos", + "x86_64-unknown-netbsd", + "x86_64-unknown-openbsd", +] + +[dependencies] +bitflags = { workspace = true } +log = { workspace = true } +cfg-if = { workspace = true } +crossbeam-channel = { workspace = true } +socket2 = { workspace = true } +slab = { workspace = true } +nohash-hasher = { workspace = true } + +# Windows specific dependencies +[target.'cfg(windows)'.dependencies] +aligned-array = "1.0.1" +windows-sys = { workspace = true, features = [ + "Win32_Foundation", + "Win32_Networking_WinSock", + "Win32_Security", + "Win32_Storage_FileSystem", + "Win32_System_Console", + "Win32_System_IO", + "Win32_System_Pipes", + "Win32_System_SystemServices", + "Win32_System_Threading", + "Win32_System_WindowsProgramming", +] } + +# Linux specific dependencies +[target.'cfg(target_os = "linux")'.dependencies] +io-uring = { version = "0.7.0", optional = true } +polling = { version = "3.3.0", optional = true } + +# Other platform dependencies +[target.'cfg(all(not(target_os = "linux"), unix))'.dependencies] +polling = "3.3.0" + +[target.'cfg(unix)'.dependencies] +crossbeam-channel = { workspace = true } +crossbeam-queue = { workspace = true } +libc = { workspace = true } + +[build-dependencies] +cfg_aliases = { workspace = true } + +[features] +default = ["polling"] +polling = ["dep:polling"] + +io-uring-sqe128 = [] +io-uring-cqe32 = [] +io-uring-socket = [] + +iocp-global = [] +iocp-wait-packet = [] diff --git a/ntex-iodriver/LICENSE-APACHE b/ntex-iodriver/LICENSE-APACHE new file mode 120000 index 00000000..965b606f --- /dev/null +++ b/ntex-iodriver/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/ntex-iodriver/LICENSE-MIT b/ntex-iodriver/LICENSE-MIT new file mode 120000 index 00000000..76219eb7 --- /dev/null +++ b/ntex-iodriver/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/ntex-iodriver/README.md b/ntex-iodriver/README.md new file mode 100644 index 00000000..20c2a1c1 --- /dev/null +++ b/ntex-iodriver/README.md @@ -0,0 +1,13 @@ +--- + +# ntex + +[![MIT licensed](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/compio-rs/compio/blob/master/LICENSE) +[![crates.io](https://img.shields.io/crates/v/compio)](https://crates.io/crates/compio) +[![docs.rs](https://img.shields.io/badge/docs.rs-compio-latest)](https://docs.rs/compio) +[![Check](https://github.com/compio-rs/compio/actions/workflows/ci_check.yml/badge.svg)](https://github.com/compio-rs/compio/actions/workflows/ci_check.yml) +[![Test](https://github.com/compio-rs/compio/actions/workflows/ci_test.yml/badge.svg)](https://github.com/compio-rs/compio/actions/workflows/ci_test.yml) +[![Telegram](https://img.shields.io/badge/Telegram-compio--rs-blue?logo=telegram)](https://t.me/compio_rs) + +A specialized runtime for ntex framework with IOCP/io_uring/polling support. +This crate is inspired by [compio](https://github.com/compio-rs/compio/). diff --git a/ntex-iodriver/src/asyncify.rs b/ntex-iodriver/src/asyncify.rs new file mode 100644 index 00000000..f7a782a0 --- /dev/null +++ b/ntex-iodriver/src/asyncify.rs @@ -0,0 +1,128 @@ +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; +use std::{fmt, time::Duration}; + +use crossbeam_channel::{bounded, Receiver, Sender, TrySendError}; + +/// An error that may be emitted when all worker threads are busy. It simply +/// returns the dispatchable value with a convenient [`fmt::Debug`] and +/// [`fmt::Display`] implementation. +#[derive(Copy, Clone, PartialEq, Eq)] +pub struct DispatchError(pub T); + +impl DispatchError { + /// Consume the error, yielding the dispatchable that failed to be sent. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl fmt::Debug for DispatchError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "DispatchError(..)".fmt(f) + } +} + +impl fmt::Display for DispatchError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "all threads are busy".fmt(f) + } +} + +impl std::error::Error for DispatchError {} + +type BoxedDispatchable = Box; + +/// A trait for dispatching a closure. It's implemented for all `FnOnce() + Send +/// + 'static` but may also be implemented for any other types that are `Send` +/// and `'static`. +pub trait Dispatchable: Send + 'static { + /// Run the dispatchable + fn run(self: Box); +} + +impl Dispatchable for F +where + F: FnOnce() + Send + 'static, +{ + fn run(self: Box) { + (*self)() + } +} + +struct CounterGuard(Arc); + +impl Drop for CounterGuard { + fn drop(&mut self) { + self.0.fetch_sub(1, Ordering::AcqRel); + } +} + +fn worker( + receiver: Receiver, + counter: Arc, + timeout: Duration, +) -> impl FnOnce() { + move || { + counter.fetch_add(1, Ordering::AcqRel); + let _guard = CounterGuard(counter); + while let Ok(f) = receiver.recv_timeout(timeout) { + f.run(); + } + } +} + +/// A thread pool to perform blocking operations in other threads. +#[derive(Debug, Clone)] +pub struct AsyncifyPool { + sender: Sender, + receiver: Receiver, + counter: Arc, + thread_limit: usize, + recv_timeout: Duration, +} + +impl AsyncifyPool { + /// Create [`AsyncifyPool`] with thread number limit and channel receive + /// timeout. + pub fn new(thread_limit: usize, recv_timeout: Duration) -> Self { + let (sender, receiver) = bounded(0); + Self { + sender, + receiver, + counter: Arc::new(AtomicUsize::new(0)), + thread_limit, + recv_timeout, + } + } + + /// Send a dispatchable, usually a closure, to another thread. Usually the + /// user should not use it. When all threads are busy and thread number + /// limit has been reached, it will return an error with the original + /// dispatchable. + pub fn dispatch(&self, f: D) -> Result<(), DispatchError> { + match self.sender.try_send(Box::new(f) as BoxedDispatchable) { + Ok(_) => Ok(()), + Err(e) => match e { + TrySendError::Full(f) => { + if self.counter.load(Ordering::Acquire) >= self.thread_limit { + // Safety: we can ensure the type + Err(DispatchError(*unsafe { + Box::from_raw(Box::into_raw(f).cast()) + })) + } else { + std::thread::spawn(worker( + self.receiver.clone(), + self.counter.clone(), + self.recv_timeout, + )); + self.sender.send(f).expect("the channel should not be full"); + Ok(()) + } + } + TrySendError::Disconnected(_) => { + unreachable!("receiver should not all disconnected") + } + }, + } + } +} diff --git a/ntex-iodriver/src/driver_type.rs b/ntex-iodriver/src/driver_type.rs new file mode 100644 index 00000000..61d39aa2 --- /dev/null +++ b/ntex-iodriver/src/driver_type.rs @@ -0,0 +1,115 @@ +use std::sync::atomic::{AtomicU8, Ordering}; + +const UNINIT: u8 = u8::MAX; +const IO_URING: u8 = 0; +const POLLING: u8 = 1; +const IOCP: u8 = 2; + +static DRIVER_TYPE: AtomicU8 = AtomicU8::new(UNINIT); + +/// Representing underlying driver type the fusion driver is using +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum DriverType { + /// Using `polling` driver + Poll = POLLING, + + /// Using `io-uring` driver + IoUring = IO_URING, + + /// Using `iocp` driver + IOCP = IOCP, +} + +impl DriverType { + fn from_num(n: u8) -> Self { + match n { + IO_URING => Self::IoUring, + POLLING => Self::Poll, + IOCP => Self::IOCP, + _ => unreachable!("invalid driver type"), + } + } + + /// Get the underlying driver type + fn get() -> DriverType { + cfg_if::cfg_if! { + if #[cfg(windows)] { + DriverType::IOCP + } else if #[cfg(all(target_os = "linux", feature = "polling", feature = "io-uring"))] { + use io_uring::opcode::*; + + // Add more opcodes here if used + const USED_OP: &[u8] = &[ + Read::CODE, + Readv::CODE, + Write::CODE, + Writev::CODE, + Fsync::CODE, + Accept::CODE, + Connect::CODE, + RecvMsg::CODE, + SendMsg::CODE, + AsyncCancel::CODE, + OpenAt::CODE, + Close::CODE, + Shutdown::CODE, + // Linux kernel 5.19 + #[cfg(any( + feature = "io-uring-sqe128", + feature = "io-uring-cqe32", + feature = "io-uring-socket" + ))] + Socket::CODE, + ]; + + (|| { + let uring = io_uring::IoUring::new(2)?; + let mut probe = io_uring::Probe::new(); + uring.submitter().register_probe(&mut probe)?; + if USED_OP.iter().all(|op| probe.is_supported(*op)) { + std::io::Result::Ok(DriverType::IoUring) + } else { + Ok(DriverType::Poll) + } + })() + .unwrap_or(DriverType::Poll) // Should we fail here? + } else if #[cfg(all(target_os = "linux", feature = "io-uring"))] { + DriverType::IoUring + } else if #[cfg(unix)] { + DriverType::Poll + } else { + compile_error!("unsupported platform"); + } + } + } + + /// Get the underlying driver type and cache it. Following calls will return + /// the cached value. + pub fn current() -> DriverType { + match DRIVER_TYPE.load(Ordering::Acquire) { + UNINIT => {} + x => return DriverType::from_num(x), + } + let dev_ty = Self::get(); + + DRIVER_TYPE.store(dev_ty as u8, Ordering::Release); + + dev_ty + } + + /// Check if the current driver is `polling` + pub fn is_polling() -> bool { + Self::current() == DriverType::Poll + } + + /// Check if the current driver is `io-uring` + pub fn is_iouring() -> bool { + Self::current() == DriverType::IoUring + } + + /// Check if the current driver is `iocp` + pub fn is_iocp() -> bool { + Self::current() == DriverType::IOCP + } +} diff --git a/ntex-iodriver/src/key.rs b/ntex-iodriver/src/key.rs new file mode 100644 index 00000000..7f018af8 --- /dev/null +++ b/ntex-iodriver/src/key.rs @@ -0,0 +1,223 @@ +use std::{io, marker::PhantomData, mem::MaybeUninit, pin::Pin, task::Waker}; + +use crate::{OpCode, Overlapped, PushEntry, RawFd}; + +/// An operation with other needed information. It should be allocated on the +/// heap. The pointer to this struct is used as `user_data`, and on Windows, it +/// is used as the pointer to `OVERLAPPED`. +/// +/// `*const RawOp` can be obtained from any `Key` by +/// first casting `Key::user_data` to `*const RawOp<()>`, then upcasted with +/// `upcast_fn`. It is done in [`Key::as_op_pin`]. +#[repr(C)] +pub(crate) struct RawOp { + header: Overlapped, + // The cancelled flag and the result here are manual reference counting. The driver holds the + // strong ref until it completes; the runtime holds the strong ref until the future is + // dropped. + cancelled: bool, + // The metadata in `*mut RawOp` + metadata: usize, + result: PushEntry, io::Result>, + flags: u32, + op: T, +} + +#[repr(C)] +union OpCodePtrRepr { + ptr: *mut RawOp, + components: OpCodePtrComponents, +} + +#[repr(C)] +#[derive(Clone, Copy)] +struct OpCodePtrComponents { + data_pointer: *mut (), + metadata: usize, +} + +fn opcode_metadata() -> usize { + let mut op = MaybeUninit::>::uninit(); + // SAFETY: same as `core::ptr::metadata`. + unsafe { + OpCodePtrRepr { + ptr: op.as_mut_ptr(), + } + .components + .metadata + } +} + +const unsafe fn opcode_dyn_mut(ptr: *mut (), metadata: usize) -> *mut RawOp { + OpCodePtrRepr { + components: OpCodePtrComponents { + metadata, + data_pointer: ptr, + }, + } + .ptr +} + +/// A typed wrapper for key of Ops submitted into driver. It doesn't free the +/// inner on dropping. Instead, the memory is managed by the proactor. The inner +/// is only freed when: +/// +/// 1. The op is completed and the future asks the result. `into_inner` will be +/// called by the proactor. +/// 2. The op is completed and the future cancels it. `into_box` will be called +/// by the proactor. +#[derive(PartialEq, Eq, Hash)] +pub struct Key { + user_data: *mut (), + _p: PhantomData>>, +} + +impl Unpin for Key {} + +impl Key { + /// Create [`RawOp`] and get the [`Key`] to it. + pub(crate) fn new(driver: RawFd, op: T) -> Self { + let header = Overlapped::new(driver); + let raw_op = Box::new(RawOp { + header, + cancelled: false, + metadata: opcode_metadata::(), + result: PushEntry::Pending(None), + flags: 0, + op, + }); + unsafe { Self::new_unchecked(Box::into_raw(raw_op) as _) } + } +} + +impl Key { + /// Create a new `Key` with the given user data. + /// + /// # Safety + /// + /// Caller needs to ensure that `T` does correspond to `user_data` in driver + /// this `Key` is created with. In most cases, it is enough to let `T` be + /// `dyn OpCode`. + pub unsafe fn new_unchecked(user_data: usize) -> Self { + Self { + user_data: user_data as _, + _p: PhantomData, + } + } + + /// Get the unique user-defined data. + pub fn user_data(&self) -> usize { + self.user_data as _ + } + + fn as_opaque(&self) -> &RawOp<()> { + // SAFETY: user_data is unique and RawOp is repr(C). + unsafe { &*(self.user_data as *const RawOp<()>) } + } + + fn as_opaque_mut(&mut self) -> &mut RawOp<()> { + // SAFETY: see `as_opaque`. + unsafe { &mut *(self.user_data as *mut RawOp<()>) } + } + + fn as_dyn_mut_ptr(&mut self) -> *mut RawOp { + let user_data = self.user_data; + let this = self.as_opaque_mut(); + // SAFETY: metadata from `Key::new`. + unsafe { opcode_dyn_mut(user_data, this.metadata) } + } + + /// A pointer to OVERLAPPED. + #[cfg(windows)] + pub(crate) fn as_mut_ptr(&mut self) -> *mut Overlapped { + &mut self.as_opaque_mut().header + } + + /// Cancel the op, decrease the ref count. The return value indicates if the + /// op is completed. If so, the op should be dropped because it is + /// useless. + pub(crate) fn set_cancelled(&mut self) -> bool { + self.as_opaque_mut().cancelled = true; + self.has_result() + } + + /// Complete the op, decrease the ref count. Wake the future if a waker is + /// set. The return value indicates if the op is cancelled. If so, the + /// op should be dropped because it is useless. + pub(crate) fn set_result(&mut self, res: io::Result) -> bool { + let this = unsafe { &mut *self.as_dyn_mut_ptr() }; + #[cfg(all(target_os = "linux", feature = "io-uring"))] + if let Ok(res) = res { + unsafe { + Pin::new_unchecked(&mut this.op).set_result(res); + } + } + if let PushEntry::Pending(Some(w)) = + std::mem::replace(&mut this.result, PushEntry::Ready(res)) + { + w.wake(); + } + this.cancelled + } + + pub(crate) fn set_flags(&mut self, flags: u32) { + self.as_opaque_mut().flags = flags; + } + + pub(crate) fn flags(&self) -> u32 { + self.as_opaque().flags + } + + /// Whether the op is completed. + pub(crate) fn has_result(&self) -> bool { + self.as_opaque().result.is_ready() + } + + /// Set waker of the current future. + pub(crate) fn set_waker(&mut self, waker: Waker) { + if let PushEntry::Pending(w) = &mut self.as_opaque_mut().result { + *w = Some(waker) + } + } + + /// Get the inner [`RawOp`]. It is usually used to drop the inner + /// immediately, without knowing about the inner `T`. + /// + /// # Safety + /// + /// Call it only when the op is cancelled and completed, which is the case + /// when the ref count becomes zero. See doc of [`Key::set_cancelled`] + /// and [`Key::set_result`]. + pub(crate) unsafe fn into_box(mut self) -> Box> { + Box::from_raw(self.as_dyn_mut_ptr()) + } +} + +impl Key { + /// Get the inner result if it is completed. + /// + /// # Safety + /// + /// Call it only when the op is completed, otherwise it is UB. + pub(crate) unsafe fn into_inner(self) -> (io::Result, T) { + let op = unsafe { Box::from_raw(self.user_data as *mut RawOp) }; + (op.result.take_ready().unwrap_unchecked(), op.op) + } +} + +impl Key { + /// Pin the inner op. + pub(crate) fn as_op_pin(&mut self) -> Pin<&mut dyn OpCode> { + // SAFETY: the inner won't be moved. + unsafe { + let this = &mut *self.as_dyn_mut_ptr(); + Pin::new_unchecked(&mut this.op) + } + } +} + +impl std::fmt::Debug for Key { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Key({})", self.user_data()) + } +} diff --git a/ntex-iodriver/src/lib.rs b/ntex-iodriver/src/lib.rs new file mode 100644 index 00000000..c66ff350 --- /dev/null +++ b/ntex-iodriver/src/lib.rs @@ -0,0 +1,516 @@ +//! The platform-specified driver. +//! Some types differ by compilation target. + +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +#![allow(clippy::type_complexity)] + +#[cfg(all( + target_os = "linux", + not(feature = "io-uring"), + not(feature = "polling") +))] +compile_error!( + "You must choose at least one of these features: [\"io-uring\", \"polling\"]" +); + +#[cfg(unix)] +use std::{io, task::Poll, task::Waker, time::Duration}; + +#[cfg(unix)] +mod key; +#[cfg(unix)] +pub use key::Key; + +#[cfg(unix)] +pub mod op; +#[cfg(unix)] +#[cfg_attr(docsrs, doc(cfg(all())))] +mod unix; +#[cfg(unix)] +use unix::Overlapped; + +#[cfg(unix)] +mod asyncify; +#[cfg(unix)] +pub use asyncify::*; + +mod driver_type; +pub use driver_type::*; + +thread_local! { + static LOGGING: std::cell::Cell = const { std::cell::Cell::new(false) }; +} + +/// enable logging for thread +pub fn enable_logging() { + LOGGING.with(|v| v.set(true)); +} + +/// enable logging for thread +pub fn log>(s: T) { + LOGGING.with(|_v| { + //if _v.get() { + println!("{}", s.as_ref()); + //} + }); +} + +cfg_if::cfg_if! { + //if #[cfg(windows)] { + // #[path = "iocp/mod.rs"] + // mod sys; + //} else if #[cfg(all(target_os = "linux", feature = "io-uring"))] { + // #[path = "iour/mod.rs"] + // mod sys; + //} else + if #[cfg(unix)] { + #[path = "poll/mod.rs"] + mod sys; + } +} + +#[cfg(unix)] +pub use sys::*; + +#[cfg(windows)] +#[macro_export] +#[doc(hidden)] +macro_rules! syscall { + (BOOL, $e:expr) => { + $crate::syscall!($e, == 0) + }; + (SOCKET, $e:expr) => { + $crate::syscall!($e, != 0) + }; + (HANDLE, $e:expr) => { + $crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE) + }; + ($e:expr, $op: tt $rhs: expr) => {{ + #[allow(unused_unsafe)] + let res = unsafe { $e }; + if res $op $rhs { + Err(::std::io::Error::last_os_error()) + } else { + Ok(res) + } + }}; +} + +/// Helper macro to execute a system call +#[cfg(unix)] +#[macro_export] +#[doc(hidden)] +macro_rules! syscall { + (break $e:expr) => { + loop { + match $crate::syscall!($e) { + Ok(fd) => break ::std::task::Poll::Ready(Ok(fd as usize)), + Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS) + => break ::std::task::Poll::Pending, + Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => {}, + Err(e) => break ::std::task::Poll::Ready(Err(e)), + } + } + }; + ($e:expr, $f:ident($fd:expr)) => { + match $crate::syscall!(break $e) { + ::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)), + ::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)), + ::std::task::Poll::Ready(Err(e)) => Err(e), + } + }; + ($e:expr) => {{ + #[allow(unused_unsafe)] + let res = unsafe { $e }; + if res == -1 { + Err(::std::io::Error::last_os_error()) + } else { + Ok(res) + } + }}; +} + +#[macro_export] +#[doc(hidden)] +macro_rules! impl_raw_fd { + ($t:ty, $it:ty, $inner:ident) => { + impl $crate::AsRawFd for $t { + fn as_raw_fd(&self) -> $crate::RawFd { + self.$inner.as_raw_fd() + } + } + #[cfg(unix)] + impl std::os::fd::FromRawFd for $t { + unsafe fn from_raw_fd(fd: $crate::RawFd) -> Self { + Self { + $inner: std::os::fd::FromRawFd::from_raw_fd(fd), + } + } + } + }; + ($t:ty, $it:ty, $inner:ident,file) => { + $crate::impl_raw_fd!($t, $it, $inner); + #[cfg(windows)] + impl std::os::windows::io::FromRawHandle for $t { + unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self { + Self { + $inner: std::os::windows::io::FromRawHandle::from_raw_handle(handle), + } + } + } + #[cfg(windows)] + impl std::os::windows::io::AsRawHandle for $t { + fn as_raw_handle(&self) -> std::os::windows::io::RawHandle { + self.$inner.as_raw_handle() + } + } + }; + ($t:ty, $it:ty, $inner:ident,socket) => { + $crate::impl_raw_fd!($t, $it, $inner); + #[cfg(windows)] + impl std::os::windows::io::FromRawSocket for $t { + unsafe fn from_raw_socket(sock: std::os::windows::io::RawSocket) -> Self { + Self { + $inner: std::os::windows::io::FromRawSocket::from_raw_socket(sock), + } + } + } + #[cfg(windows)] + impl std::os::windows::io::AsRawSocket for $t { + fn as_raw_socket(&self) -> std::os::windows::io::RawSocket { + self.$inner.as_raw_socket() + } + } + }; +} + +/// The return type of [`Proactor::push`]. +pub enum PushEntry { + /// The operation is pushed to the submission queue. + Pending(K), + /// The operation is ready and returns. + Ready(R), +} + +impl PushEntry { + /// Get if the current variant is [`PushEntry::Ready`]. + pub const fn is_ready(&self) -> bool { + matches!(self, Self::Ready(_)) + } + + /// Take the ready variant if exists. + pub fn take_ready(self) -> Option { + match self { + Self::Pending(_) => None, + Self::Ready(res) => Some(res), + } + } + + /// Map the [`PushEntry::Pending`] branch. + pub fn map_pending(self, f: impl FnOnce(K) -> L) -> PushEntry { + match self { + Self::Pending(k) => PushEntry::Pending(f(k)), + Self::Ready(r) => PushEntry::Ready(r), + } + } + + /// Map the [`PushEntry::Ready`] branch. + pub fn map_ready(self, f: impl FnOnce(R) -> S) -> PushEntry { + match self { + Self::Pending(k) => PushEntry::Pending(k), + Self::Ready(r) => PushEntry::Ready(f(r)), + } + } +} + +#[cfg(unix)] +/// Low-level actions of completion-based IO. +/// It owns the operations to keep the driver safe. +pub struct Proactor { + driver: Driver, +} + +#[cfg(unix)] +impl Proactor { + /// Create [`Proactor`] with 1024 entries. + pub fn new() -> io::Result { + Self::builder().build() + } + + /// Create [`ProactorBuilder`] to config the proactor. + pub fn builder() -> ProactorBuilder { + ProactorBuilder::new() + } + + fn with_builder(builder: &ProactorBuilder) -> io::Result { + Ok(Self { + driver: Driver::new(builder)?, + }) + } + + /// Attach an fd to the driver. + /// + /// ## Platform specific + /// * IOCP: it will be attached to the completion port. An fd could only be + /// attached to one driver, and could only be attached once, even if you + /// `try_clone` it. + /// * io-uring & polling: it will do nothing but return `Ok(())`. + pub fn attach(&self, fd: RawFd) -> io::Result<()> { + self.driver.attach(fd) + } + + /// Cancel an operation with the pushed user-defined data. + /// + /// The cancellation is not reliable. The underlying operation may continue, + /// but just don't return from [`Proactor::poll`]. Therefore, although an + /// operation is cancelled, you should not reuse its `user_data`. + pub fn cancel(&self, mut op: Key) -> Option<(io::Result, T)> { + if op.set_cancelled() { + // SAFETY: completed. + Some(unsafe { op.into_inner() }) + } else { + None + } + } + + /// Push an operation into the driver, and return the unique key, called + /// user-defined data, associated with it. + pub fn push( + &self, + op: T, + ) -> PushEntry, (io::Result, T)> { + let mut op = self.driver.create_op(op); + match self + .driver + .push(&mut unsafe { Key::::new_unchecked(op.user_data()) }) + { + Poll::Pending => PushEntry::Pending(op), + Poll::Ready(res) => { + op.set_result(res); + // SAFETY: just completed. + PushEntry::Ready(unsafe { op.into_inner() }) + } + } + } + + /// Poll the driver and get completed entries. + /// You need to call [`Proactor::pop`] to get the pushed + /// operations. + pub fn poll(&self, timeout: Option, f: F) -> io::Result<()> { + unsafe { self.driver.poll(timeout, f) } + } + + /// Get the pushed operations from the completion entries. + /// + /// # Panics + /// This function will panic if the requested operation has not been + /// completed. + pub fn pop(&self, op: Key) -> PushEntry, ((io::Result, T), u32)> { + if op.has_result() { + let flags = op.flags(); + // SAFETY: completed. + PushEntry::Ready((unsafe { op.into_inner() }, flags)) + } else { + PushEntry::Pending(op) + } + } + + /// Update the waker of the specified op. + pub fn update_waker(&self, op: &mut Key, waker: Waker) { + op.set_waker(waker); + } + + /// Create a notify handle to interrupt the inner driver. + pub fn handle(&self) -> NotifyHandle { + self.driver.handle() + } + + pub fn register_handler(&self, f: F) + where + F: FnOnce(DriverApi) -> Box, + { + self.driver.register_handler(f) + } +} + +#[cfg(unix)] +impl AsRawFd for Proactor { + fn as_raw_fd(&self) -> RawFd { + self.driver.as_raw_fd() + } +} + +/// An completed entry returned from kernel. +#[cfg(unix)] +#[derive(Debug)] +pub(crate) struct Entry { + user_data: usize, + result: io::Result, + flags: u32, +} + +#[cfg(unix)] +impl Entry { + pub(crate) fn new(user_data: usize, result: io::Result) -> Self { + Self { + user_data, + result, + flags: 0, + } + } + + #[cfg(all(target_os = "linux", feature = "io-uring"))] + // this method only used by in io-uring driver + pub(crate) fn set_flags(&mut self, flags: u32) { + self.flags = flags; + } + + /// The user-defined data returned by [`Proactor::push`]. + pub fn user_data(&self) -> usize { + self.user_data + } + + pub fn flags(&self) -> u32 { + self.flags + } + + /// The result of the operation. + pub fn into_result(self) -> io::Result { + self.result + } + + /// SAFETY: `user_data` should be a valid pointer. + pub unsafe fn notify(self) { + let user_data = self.user_data(); + let mut op = Key::<()>::new_unchecked(user_data); + op.set_flags(self.flags()); + if op.set_result(self.into_result()) { + // SAFETY: completed and cancelled. + let _ = op.into_box(); + } + } +} + +#[cfg(unix)] +#[derive(Debug, Clone)] +enum ThreadPoolBuilder { + Create { limit: usize, recv_limit: Duration }, + Reuse(AsyncifyPool), +} + +#[cfg(unix)] +impl Default for ThreadPoolBuilder { + fn default() -> Self { + Self::new() + } +} + +#[cfg(unix)] +impl ThreadPoolBuilder { + pub fn new() -> Self { + Self::Create { + limit: 256, + recv_limit: Duration::from_secs(60), + } + } + + pub fn create_or_reuse(&self) -> AsyncifyPool { + match self { + Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit), + Self::Reuse(pool) => pool.clone(), + } + } +} + +/// Builder for [`Proactor`]. +#[cfg(unix)] +#[derive(Debug, Clone)] +pub struct ProactorBuilder { + capacity: u32, + pool_builder: ThreadPoolBuilder, + sqpoll_idle: Option, +} + +#[cfg(unix)] +impl Default for ProactorBuilder { + fn default() -> Self { + Self::new() + } +} + +#[cfg(unix)] +impl ProactorBuilder { + /// Create the builder with default config. + pub fn new() -> Self { + Self { + capacity: 1024, + pool_builder: ThreadPoolBuilder::new(), + sqpoll_idle: None, + } + } + + /// Set the capacity of the inner event queue or submission queue, if + /// exists. The default value is 1024. + pub fn capacity(&mut self, capacity: u32) -> &mut Self { + self.capacity = capacity; + self + } + + /// Set the thread number limit of the inner thread pool, if exists. The + /// default value is 256. + /// + /// It will be ignored if `reuse_thread_pool` is set. + pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self { + if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder { + *limit = value; + } + self + } + + /// Set the waiting timeout of the inner thread, if exists. The default is + /// 60 seconds. + /// + /// It will be ignored if `reuse_thread_pool` is set. + pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self { + if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder { + *recv_limit = timeout; + } + self + } + + /// Set to reuse an existing [`AsyncifyPool`] in this proactor. + pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self { + self.pool_builder = ThreadPoolBuilder::Reuse(pool); + self + } + + /// Force reuse the thread pool for each proactor created by this builder, + /// even `reuse_thread_pool` is not set. + pub fn force_reuse_thread_pool(&mut self) -> &mut Self { + self.reuse_thread_pool(self.create_or_get_thread_pool()); + self + } + + /// Create or reuse the thread pool from the config. + pub fn create_or_get_thread_pool(&self) -> AsyncifyPool { + self.pool_builder.create_or_reuse() + } + + /// Set `io-uring` sqpoll idle milliseconds, when `sqpoll_idle` is set, + /// io-uring sqpoll feature will be enabled + /// + /// # Notes + /// + /// - Only effective when the `io-uring` feature is enabled + /// - `idle` must >= 1ms, otherwise will set sqpoll idle 0ms + /// - `idle` will be rounded down + pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self { + self.sqpoll_idle = Some(idle); + self + } + + /// Build the [`Proactor`]. + pub fn build(&self) -> io::Result { + Proactor::with_builder(self) + } +} diff --git a/ntex-iodriver/src/op.rs b/ntex-iodriver/src/op.rs new file mode 100644 index 00000000..c92efc3b --- /dev/null +++ b/ntex-iodriver/src/op.rs @@ -0,0 +1,75 @@ +//! The async operations. +//! +//! Types in this mod represents the low-level operations passed to kernel. +//! The operation itself doesn't perform anything. +//! You need to pass them to [`crate::Proactor`], and poll the driver. + +use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown}; + +#[cfg(unix)] +pub use crate::sys::op::{CreateSocket, Interest}; + +use crate::OwnedFd; + +pub trait Handler { + /// Submitted interest + fn readable(&mut self, id: usize); + + /// Submitted interest + fn writable(&mut self, id: usize); + + /// Operation submission has failed + fn error(&mut self, id: usize, err: io::Error); + + /// All events are processed, process all updates + fn commit(&mut self); +} + +/// Spawn a blocking function in the thread pool. +pub struct Asyncify { + pub(crate) f: Option, + pub(crate) data: Option, + _p: PhantomPinned, +} + +impl Asyncify { + /// Create [`Asyncify`]. + pub fn new(f: F) -> Self { + Self { + f: Some(f), + data: None, + _p: PhantomPinned, + } + } + + pub fn into_inner(mut self) -> D { + self.data.take().expect("the data should not be None") + } +} + +/// Shutdown a socket. +pub struct ShutdownSocket { + pub(crate) fd: S, + pub(crate) how: Shutdown, +} + +impl ShutdownSocket { + /// Create [`ShutdownSocket`]. + pub fn new(fd: S, how: Shutdown) -> Self { + Self { fd, how } + } +} + +/// Close socket fd. +pub struct CloseSocket { + pub(crate) fd: ManuallyDrop, +} + +impl CloseSocket { + /// Create [`CloseSocket`]. + pub fn new(fd: OwnedFd) -> Self { + Self { + fd: ManuallyDrop::new(fd), + } + } +} diff --git a/ntex-iodriver/src/poll/mod.rs b/ntex-iodriver/src/poll/mod.rs new file mode 100644 index 00000000..7305e7be --- /dev/null +++ b/ntex-iodriver/src/poll/mod.rs @@ -0,0 +1,459 @@ +#![allow(clippy::type_complexity)] +pub use std::os::fd::{AsRawFd, OwnedFd, RawFd}; + +use std::{cell::Cell, cell::RefCell, io, rc::Rc, sync::Arc}; +use std::{num::NonZeroUsize, os::fd::BorrowedFd, pin::Pin, task::Poll, time::Duration}; + +use crossbeam_queue::SegQueue; +use nohash_hasher::IntMap; +use polling::{Event, Events, Poller}; + +use crate::{op::Handler, op::Interest, AsyncifyPool, Entry, Key, ProactorBuilder}; + +pub(crate) mod op; + +/// Abstraction of operations. +pub trait OpCode { + /// Perform the operation before submit, and return [`Decision`] to + /// indicate whether submitting the operation to polling is required. + fn pre_submit(self: Pin<&mut Self>) -> io::Result; + + /// Perform the operation after received corresponding + /// event. If this operation is blocking, the return value should be + /// [`Poll::Ready`]. + fn operate(self: Pin<&mut Self>) -> Poll>; +} + +/// Result of [`OpCode::pre_submit`]. +#[non_exhaustive] +pub enum Decision { + /// Instant operation, no need to submit + Completed(usize), + /// Blocking operation, needs to be spawned in another thread + Blocking, +} + +bitflags::bitflags! { + #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] + struct Flags: u8 { + const NEW = 0b0000_0001; + const CHANGED = 0b0000_0010; + } +} + +#[derive(Debug)] +struct FdItem { + flags: Flags, + batch: usize, + read: Option, + write: Option, +} + +impl FdItem { + fn new(batch: usize) -> Self { + Self { + batch, + read: None, + write: None, + flags: Flags::NEW, + } + } + + fn register(&mut self, user_data: usize, interest: Interest) { + self.flags.insert(Flags::CHANGED); + match interest { + Interest::Readable => { + self.read = Some(user_data); + } + Interest::Writable => { + self.write = Some(user_data); + } + } + } + + fn unregister(&mut self, int: Interest) { + let res = match int { + Interest::Readable => self.read.take(), + Interest::Writable => self.write.take(), + }; + if res.is_some() { + self.flags.insert(Flags::CHANGED); + } + } + + fn unregister_all(&mut self) { + if self.read.is_some() || self.write.is_some() { + self.flags.insert(Flags::CHANGED); + } + + let _ = self.read.take(); + let _ = self.write.take(); + } + + fn user_data(&mut self, interest: Interest) -> Option { + match interest { + Interest::Readable => self.read, + Interest::Writable => self.write, + } + } + + fn event(&self, key: usize) -> Event { + let mut event = Event::none(key); + if self.read.is_some() { + event.readable = true; + } + if self.write.is_some() { + event.writable = true; + } + event + } +} + +#[derive(Debug)] +enum Change { + Register { + fd: RawFd, + batch: usize, + user_data: usize, + int: Interest, + }, + Unregister { + fd: RawFd, + batch: usize, + int: Interest, + }, + UnregisterAll { + fd: RawFd, + batch: usize, + }, + Blocking { + user_data: usize, + }, +} + +pub struct DriverApi { + batch: usize, + changes: Rc>>, +} + +impl DriverApi { + pub fn register(&self, fd: RawFd, user_data: usize, int: Interest) { + log::debug!( + "Register interest {:?} for {:?} user-data: {:?}", + int, + fd, + user_data + ); + self.change(Change::Register { + fd, + batch: self.batch, + user_data, + int, + }); + } + + pub fn unregister(&self, fd: RawFd, int: Interest) { + log::debug!( + "Unregister interest {:?} for {:?} batch: {:?}", + int, + fd, + self.batch + ); + self.change(Change::Unregister { + fd, + batch: self.batch, + int, + }); + } + + pub fn unregister_all(&self, fd: RawFd) { + self.change(Change::UnregisterAll { + fd, + batch: self.batch, + }); + } + + fn change(&self, change: Change) { + self.changes.borrow_mut().push(change); + } +} + +/// Low-level driver of polling. +pub(crate) struct Driver { + poll: Arc, + events: RefCell, + registry: RefCell>, + pool: AsyncifyPool, + pool_completed: Arc>, + hid: Cell, + changes: Rc>>, + handlers: Cell>>>>, +} + +impl Driver { + pub fn new(builder: &ProactorBuilder) -> io::Result { + log::trace!("New poll driver"); + let entries = builder.capacity as usize; // for the sake of consistency, use u32 like iour + let events = if entries == 0 { + Events::new() + } else { + Events::with_capacity(NonZeroUsize::new(entries).unwrap()) + }; + + Ok(Self { + poll: Arc::new(Poller::new()?), + events: RefCell::new(events), + registry: RefCell::new(Default::default()), + pool: builder.create_or_get_thread_pool(), + pool_completed: Arc::new(SegQueue::new()), + hid: Cell::new(0), + changes: Rc::new(RefCell::new(Vec::with_capacity(16))), + handlers: Cell::new(Some(Box::new(Vec::default()))), + }) + } + + pub fn register_handler(&self, f: F) + where + F: FnOnce(DriverApi) -> Box, + { + let id = self.hid.get(); + let mut handlers = self.handlers.take().unwrap_or_default(); + + let api = DriverApi { + batch: id, + changes: self.changes.clone(), + }; + handlers.push(f(api)); + self.hid.set(id + 1); + self.handlers.set(Some(handlers)); + } + + pub fn create_op(&self, op: T) -> Key { + Key::new(self.as_raw_fd(), op) + } + + pub fn attach(&self, _fd: RawFd) -> io::Result<()> { + Ok(()) + } + + pub fn push(&self, op: &mut Key) -> Poll> { + let user_data = op.user_data(); + let op_pin = op.as_op_pin(); + match op_pin.pre_submit()? { + Decision::Completed(res) => Poll::Ready(Ok(res)), + Decision::Blocking => { + self.changes + .borrow_mut() + .push(Change::Blocking { user_data }); + Poll::Pending + } + } + } + + pub unsafe fn poll( + &self, + timeout: Option, + f: F, + ) -> io::Result<()> { + if self.poll_blocking() { + f(); + self.apply_changes()?; + return Ok(()); + } + + let mut events = self.events.borrow_mut(); + self.poll.wait(&mut events, timeout)?; + + if events.is_empty() { + if timeout.is_some() && timeout != Some(Duration::ZERO) { + return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)); + } + } else { + // println!("POLL, events: {:?}", events.len()); + let mut registry = self.registry.borrow_mut(); + let mut handlers = self.handlers.take().unwrap(); + for event in events.iter() { + let fd = event.key as RawFd; + log::debug!("Event {:?} for {:?}", event, registry.get(&fd)); + + if let Some(item) = registry.get_mut(&fd) { + if event.readable { + if let Some(user_data) = item.user_data(Interest::Readable) { + handlers[item.batch].readable(user_data) + } + } + if event.writable { + if let Some(user_data) = item.user_data(Interest::Writable) { + handlers[item.batch].writable(user_data) + } + } + } + } + self.handlers.set(Some(handlers)); + } + + // apply changes + self.apply_changes()?; + + // complete batch handling + let mut handlers = self.handlers.take().unwrap(); + for handler in handlers.iter_mut() { + handler.commit(); + } + self.handlers.set(Some(handlers)); + self.apply_changes()?; + + // run user function + f(); + + // check if we have more changes from "run" + self.apply_changes()?; + + Ok(()) + } + + /// re-calc driver changes + unsafe fn apply_changes(&self) -> io::Result<()> { + let mut changes = self.changes.borrow_mut(); + if changes.is_empty() { + return Ok(()); + } + log::debug!("Apply driver changes, {:?}", changes.len()); + + let mut registry = self.registry.borrow_mut(); + + for change in &mut *changes { + match change { + Change::Register { + fd, + batch, + user_data, + int, + } => { + let item = registry.entry(*fd).or_insert_with(|| FdItem::new(*batch)); + item.register(*user_data, *int); + } + Change::Unregister { fd, batch, int } => { + let item = registry.entry(*fd).or_insert_with(|| FdItem::new(*batch)); + item.unregister(*int); + } + Change::UnregisterAll { fd, batch } => { + let item = registry.entry(*fd).or_insert_with(|| FdItem::new(*batch)); + item.unregister_all(); + } + _ => {} + } + } + + for change in changes.drain(..) { + let fd = match change { + Change::Register { fd, .. } => Some(fd), + Change::Unregister { fd, .. } => Some(fd), + Change::UnregisterAll { fd, .. } => Some(fd), + Change::Blocking { user_data } => { + self.push_blocking(user_data); + None + } + }; + + if let Some(fd) = fd { + if let Some(item) = registry.get_mut(&fd) { + if item.flags.contains(Flags::CHANGED) { + item.flags.remove(Flags::CHANGED); + + let new = item.flags.contains(Flags::NEW); + let renew_event = item.event(fd as usize); + + if !renew_event.readable && !renew_event.writable { + // crate::log(format!("DELETE - {:?}", fd.as_raw_fd())); + registry.remove(&fd); + if !new { + self.poll.delete(BorrowedFd::borrow_raw(fd))?; + } + } else if new { + item.flags.remove(Flags::NEW); + // crate::log(format!("ADD - {:?}", fd.as_raw_fd())); + unsafe { self.poll.add(fd, renew_event)? }; + } else { + // crate::log(format!("MODIFY - {:?} {:?}", fd.as_raw_fd(), renew_event)); + self.poll.modify(BorrowedFd::borrow_raw(fd), renew_event)?; + } + } + } + } + } + + Ok(()) + } + + fn push_blocking(&self, user_data: usize) { + let poll = self.poll.clone(); + let completed = self.pool_completed.clone(); + let mut closure = move || { + let mut op = unsafe { Key::::new_unchecked(user_data) }; + let op_pin = op.as_op_pin(); + let res = match op_pin.operate() { + Poll::Pending => unreachable!("this operation is not non-blocking"), + Poll::Ready(res) => res, + }; + completed.push(Entry::new(user_data, res)); + poll.notify().ok(); + }; + while let Err(e) = self.pool.dispatch(closure) { + closure = e.0; + self.poll_blocking(); + } + } + + fn poll_blocking(&self) -> bool { + if self.pool_completed.is_empty() { + false + } else { + while let Some(entry) = self.pool_completed.pop() { + unsafe { + entry.notify(); + } + } + true + } + } + + /// Get notification handle + pub fn handle(&self) -> NotifyHandle { + NotifyHandle::new(self.poll.clone()) + } +} + +impl AsRawFd for Driver { + fn as_raw_fd(&self) -> RawFd { + self.poll.as_raw_fd() + } +} + +impl Drop for Driver { + fn drop(&mut self) { + for fd in self.registry.borrow().keys() { + unsafe { + let fd = BorrowedFd::borrow_raw(*fd); + self.poll.delete(fd).ok(); + } + } + } +} + +#[derive(Clone)] +/// A notify handle to the inner driver. +pub struct NotifyHandle { + poll: Arc, +} + +impl NotifyHandle { + fn new(poll: Arc) -> Self { + Self { poll } + } + + /// Notify the driver + pub fn notify(&self) -> io::Result<()> { + self.poll.notify() + } +} diff --git a/ntex-iodriver/src/poll/op.rs b/ntex-iodriver/src/poll/op.rs new file mode 100644 index 00000000..e848b5de --- /dev/null +++ b/ntex-iodriver/src/poll/op.rs @@ -0,0 +1,68 @@ +use std::{io, marker::Send, os::fd::FromRawFd, os::fd::RawFd, pin::Pin, task::Poll}; + +pub use crate::unix::op::*; + +use super::{AsRawFd, Decision, OpCode}; +use crate::{op::*, syscall}; + +impl OpCode for Asyncify +where + D: Send + 'static, + F: (FnOnce() -> (io::Result, D)) + Send + 'static, +{ + fn pre_submit(self: Pin<&mut Self>) -> io::Result { + Ok(Decision::Blocking) + } + + fn operate(self: Pin<&mut Self>) -> Poll> { + // Safety: self won't be moved + let this = unsafe { self.get_unchecked_mut() }; + let f = this + .f + .take() + .expect("the operate method could only be called once"); + let (res, data) = f(); + this.data = Some(data); + Poll::Ready(res) + } +} + +impl OpCode for CreateSocket { + fn pre_submit(self: Pin<&mut Self>) -> io::Result { + Ok(Decision::Blocking) + } + + fn operate(self: Pin<&mut Self>) -> Poll> { + Poll::Ready(Ok( + syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _, + )) + } +} + +impl OpCode for ShutdownSocket { + fn pre_submit(self: Pin<&mut Self>) -> io::Result { + Ok(Decision::Blocking) + } + + fn operate(self: Pin<&mut Self>) -> Poll> { + Poll::Ready(Ok( + syscall!(libc::shutdown(self.fd.as_raw_fd(), self.how()))? as _, + )) + } +} + +impl CloseSocket { + pub fn from_raw_fd(fd: RawFd) -> Self { + Self::new(unsafe { FromRawFd::from_raw_fd(fd) }) + } +} + +impl OpCode for CloseSocket { + fn pre_submit(self: Pin<&mut Self>) -> io::Result { + Ok(Decision::Blocking) + } + + fn operate(self: Pin<&mut Self>) -> Poll> { + Poll::Ready(Ok(syscall!(libc::close(self.fd.as_raw_fd()))? as _)) + } +} diff --git a/ntex-iodriver/src/unix/mod.rs b/ntex-iodriver/src/unix/mod.rs new file mode 100644 index 00000000..ed300d0d --- /dev/null +++ b/ntex-iodriver/src/unix/mod.rs @@ -0,0 +1,15 @@ +//! This mod doesn't actually contain any driver, but meant to provide some +//! common op type and utilities for unix platform (for iour and polling). + +pub(crate) mod op; + +use crate::RawFd; + +/// The overlapped struct for unix needn't contain extra fields. +pub(crate) struct Overlapped; + +impl Overlapped { + pub fn new(_driver: RawFd) -> Self { + Self + } +} diff --git a/ntex-iodriver/src/unix/op.rs b/ntex-iodriver/src/unix/op.rs new file mode 100644 index 00000000..3fde9467 --- /dev/null +++ b/ntex-iodriver/src/unix/op.rs @@ -0,0 +1,40 @@ +use std::net::Shutdown; + +use crate::op::*; + +/// The interest to poll a file descriptor. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Interest { + /// Represents a read operation. + Readable, + /// Represents a write operation. + Writable, +} + +/// Create a socket. +pub struct CreateSocket { + pub(crate) domain: i32, + pub(crate) socket_type: i32, + pub(crate) protocol: i32, +} + +impl CreateSocket { + /// Create [`CreateSocket`]. + pub fn new(domain: i32, socket_type: i32, protocol: i32) -> Self { + Self { + domain, + socket_type, + protocol, + } + } +} + +impl ShutdownSocket { + pub(crate) fn how(&self) -> i32 { + match self.how { + Shutdown::Write => libc::SHUT_WR, + Shutdown::Read => libc::SHUT_RD, + Shutdown::Both => libc::SHUT_RDWR, + } + } +} diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 6b1cb400..112d84bc 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.4.0" +version = "2.5.0" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -30,12 +30,15 @@ glommio = ["ntex-rt/glommio", "ntex-glommio"] # async-std runtime async-std = ["ntex-rt/async-std", "ntex-async-std"] +# default ntex runtime +default-rt = ["ntex-rt/default-rt", "ntex-runtime", "ntex-iodriver", "slab", "socket2"] + [dependencies] ntex-service = "3.3" ntex-bytes = "0.1" ntex-http = "0.1" -ntex-io = "2.8" -ntex-rt = "0.4.21" +ntex-io = "2.11" +ntex-rt = "0.4.25" ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } @@ -43,8 +46,15 @@ ntex-compio = { version = "0.2.4", optional = true } ntex-glommio = { version = "0.5.2", optional = true } ntex-async-std = { version = "0.5.1", optional = true } -log = "0.4" -thiserror = "1" +ntex-runtime = { version = "0.1.0", optional = true } +ntex-iodriver = { version = "0.1.0", optional = true } + +bitflags = { workspace = true } +log = { workspace = true } +libc = { workspace = true } +thiserror = { workspace = true } +slab = { workspace = true, optional = true } +socket2 = { workspace = true, optional = true } [dev-dependencies] ntex = "2" diff --git a/ntex-net/src/compat.rs b/ntex-net/src/compat.rs index ad320882..98fa73cb 100644 --- a/ntex-net/src/compat.rs +++ b/ntex-net/src/compat.rs @@ -6,10 +6,23 @@ pub use ntex_tokio::{from_tcp_stream, tcp_connect, tcp_connect_in}; #[cfg(all(unix, feature = "tokio"))] pub use ntex_tokio::{from_unix_stream, unix_connect, unix_connect_in}; +#[cfg(all( + feature = "default-rt", + not(feature = "tokio"), + not(feature = "async-std"), + not(feature = "compio"), + not(feature = "glommio") +))] +pub use crate::rt::{ + from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect, + unix_connect_in, +}; + #[cfg(all( feature = "compio", not(feature = "tokio"), not(feature = "async-std"), + not(feature = "default-rt"), not(feature = "glommio") ))] pub use ntex_compio::{from_tcp_stream, tcp_connect, tcp_connect_in}; @@ -19,6 +32,7 @@ pub use ntex_compio::{from_tcp_stream, tcp_connect, tcp_connect_in}; feature = "compio", not(feature = "tokio"), not(feature = "async-std"), + not(feature = "default-rt"), not(feature = "glommio") ))] pub use ntex_compio::{from_unix_stream, unix_connect, unix_connect_in}; @@ -27,6 +41,7 @@ pub use ntex_compio::{from_unix_stream, unix_connect, unix_connect_in}; feature = "async-std", not(feature = "tokio"), not(feature = "compio"), + not(feature = "default-rt"), not(feature = "glommio") ))] pub use ntex_async_std::{from_tcp_stream, tcp_connect, tcp_connect_in}; @@ -36,6 +51,7 @@ pub use ntex_async_std::{from_tcp_stream, tcp_connect, tcp_connect_in}; feature = "async-std", not(feature = "tokio"), not(feature = "compio"), + not(feature = "default-rt"), not(feature = "glommio") ))] pub use ntex_async_std::{from_unix_stream, unix_connect, unix_connect_in}; @@ -44,6 +60,7 @@ pub use ntex_async_std::{from_unix_stream, unix_connect, unix_connect_in}; feature = "glommio", not(feature = "tokio"), not(feature = "compio"), + not(feature = "default-rt"), not(feature = "async-std") ))] pub use ntex_glommio::{from_tcp_stream, tcp_connect, tcp_connect_in}; @@ -53,6 +70,7 @@ pub use ntex_glommio::{from_tcp_stream, tcp_connect, tcp_connect_in}; feature = "glommio", not(feature = "tokio"), not(feature = "compio"), + not(feature = "default-rt"), not(feature = "async-std") ))] pub use ntex_glommio::{from_unix_stream, unix_connect, unix_connect_in}; @@ -61,6 +79,7 @@ pub use ntex_glommio::{from_unix_stream, unix_connect, unix_connect_in}; not(feature = "tokio"), not(feature = "compio"), not(feature = "async-std"), + not(feature = "default-rt"), not(feature = "glommio") ))] mod no_rt { @@ -131,6 +150,7 @@ mod no_rt { not(feature = "tokio"), not(feature = "compio"), not(feature = "async-std"), + not(feature = "default-rt"), not(feature = "glommio") ))] pub use no_rt::*; diff --git a/ntex-net/src/lib.rs b/ntex-net/src/lib.rs index 60a57add..d03d9b35 100644 --- a/ntex-net/src/lib.rs +++ b/ntex-net/src/lib.rs @@ -8,3 +8,12 @@ pub use ntex_io::Io; pub use ntex_rt::{spawn, spawn_blocking}; pub use self::compat::*; + +#[cfg(all( + feature = "default-rt", + not(feature = "tokio"), + not(feature = "async-std"), + not(feature = "compio"), + not(feature = "glommio") +))] +mod rt; diff --git a/ntex-net/src/rt/connect.rs b/ntex-net/src/rt/connect.rs new file mode 100644 index 00000000..d2ad20c4 --- /dev/null +++ b/ntex-net/src/rt/connect.rs @@ -0,0 +1,196 @@ +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::{cell::RefCell, collections::VecDeque, io, path::Path, rc::Rc, task::Poll}; + +use ntex_iodriver::op::{Handler, Interest}; +use ntex_iodriver::{syscall, AsRawFd, DriverApi, RawFd}; +use ntex_runtime::net::{Socket, TcpStream, UnixStream}; +use ntex_runtime::Runtime; +use ntex_util::channel::oneshot::{channel, Sender}; +use slab::Slab; +use socket2::{Protocol, SockAddr, Type}; + +pub(crate) async fn connect(addr: SocketAddr) -> io::Result { + let addr = SockAddr::from(addr); + let socket = if cfg!(windows) { + let bind_addr = if addr.is_ipv4() { + SockAddr::from(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)) + } else if addr.is_ipv6() { + SockAddr::from(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0)) + } else { + return Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Unsupported address domain.", + )); + }; + Socket::bind(&bind_addr, Type::STREAM, Some(Protocol::TCP)).await? + } else { + Socket::new(addr.domain(), Type::STREAM, Some(Protocol::TCP)).await? + }; + + let (sender, rx) = channel(); + + ConnectOps::current().connect(socket.as_raw_fd(), addr, sender)?; + + rx.await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone")) + .and_then(|item| item)?; + + Ok(TcpStream::from_socket(socket)) +} + +pub(crate) async fn connect_unix(path: impl AsRef) -> io::Result { + let addr = SockAddr::unix(path)?; + + #[cfg(windows)] + let socket = { + let new_addr = empty_unix_socket(); + Socket::bind(&new_addr, Type::STREAM, None).await? + }; + #[cfg(unix)] + let socket = { + use socket2::Domain; + Socket::new(Domain::UNIX, Type::STREAM, None).await? + }; + + let (sender, rx) = channel(); + + ConnectOps::current().connect(socket.as_raw_fd(), addr, sender)?; + + rx.await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone")) + .and_then(|item| item)?; + + Ok(UnixStream::from_socket(socket)) +} + +#[derive(Clone)] +pub(crate) struct ConnectOps(Rc); + +#[derive(Debug)] +enum Change { + Readable, + Writable, + Error(io::Error), +} + +struct ConnectOpsBatcher { + feed: VecDeque<(usize, Change)>, + inner: Rc, +} + +struct Item { + fd: RawFd, + sender: Sender>, +} + +struct ConnectOpsInner { + api: DriverApi, + connects: RefCell>, +} + +impl ConnectOps { + pub(crate) fn current() -> Self { + Runtime::with_current(|rt| { + if let Some(s) = rt.get::() { + s + } else { + let mut inner = None; + rt.driver().register_handler(|api| { + let ops = Rc::new(ConnectOpsInner { + api, + connects: RefCell::new(Slab::new()), + }); + inner = Some(ops.clone()); + Box::new(ConnectOpsBatcher { + inner: ops, + feed: VecDeque::new(), + }) + }); + + let s = ConnectOps(inner.unwrap()); + rt.insert(s.clone()); + s + } + }) + } + + pub(crate) fn connect( + &self, + fd: RawFd, + addr: SockAddr, + sender: Sender>, + ) -> io::Result { + let result = syscall!(break libc::connect(fd, addr.as_ptr(), addr.len())); + + if let Poll::Ready(res) = result { + res?; + } + + let item = Item { fd, sender }; + let id = self.0.connects.borrow_mut().insert(item); + + self.0.api.register(fd, id, Interest::Writable); + + Ok(id) + } +} + +impl Handler for ConnectOpsBatcher { + fn readable(&mut self, id: usize) { + log::debug!("ConnectFD is readable {:?}", id); + self.feed.push_back((id, Change::Readable)); + } + + fn writable(&mut self, id: usize) { + log::debug!("ConnectFD is writable {:?}", id); + self.feed.push_back((id, Change::Writable)); + } + + fn error(&mut self, id: usize, err: io::Error) { + self.feed.push_back((id, Change::Error(err))); + } + + fn commit(&mut self) { + if self.feed.is_empty() { + return; + } + log::debug!("Commit connect driver changes, num: {:?}", self.feed.len()); + + let mut connects = self.inner.connects.borrow_mut(); + + for (id, change) in self.feed.drain(..) { + if connects.contains(id) { + let item = connects.remove(id); + match change { + Change::Readable => unreachable!(), + Change::Writable => { + let mut err: libc::c_int = 0; + let mut err_len = + std::mem::size_of::() as libc::socklen_t; + + let res = syscall!(libc::getsockopt( + item.fd.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_ERROR, + &mut err as *mut _ as *mut _, + &mut err_len + )); + + let res = if err == 0 { + res.map(|_| ()) + } else { + Err(io::Error::from_raw_os_error(err)) + }; + + self.inner.api.unregister_all(item.fd); + let _ = item.sender.send(res); + } + Change::Error(err) => { + let _ = item.sender.send(Err(err)); + self.inner.api.unregister_all(item.fd); + } + } + } + } + } +} diff --git a/ntex-net/src/rt/driver.rs b/ntex-net/src/rt/driver.rs new file mode 100644 index 00000000..00461480 --- /dev/null +++ b/ntex-net/src/rt/driver.rs @@ -0,0 +1,362 @@ +use std::{cell::Cell, collections::VecDeque, fmt, io, ptr, rc::Rc, task, task::Poll}; + +use ntex_iodriver::op::{Handler, Interest}; +use ntex_iodriver::{syscall, AsRawFd, DriverApi, RawFd}; +use ntex_runtime::Runtime; +use slab::Slab; + +use ntex_bytes::BufMut; +use ntex_io::IoContext; + +bitflags::bitflags! { + #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] + struct Flags: u8 { + const ERROR = 0b0000_0001; + const RD = 0b0000_0010; + const WR = 0b0000_0100; + } +} + +pub(crate) struct StreamCtl { + id: usize, + inner: Rc>, +} + +struct TcpStreamItem { + io: Option, + fd: RawFd, + context: IoContext, + flags: Flags, + ref_count: usize, +} + +pub(crate) struct CompioOps(Rc>); + +#[derive(Debug)] +enum Change { + Readable, + Writable, + Error(io::Error), +} + +struct CompioOpsBatcher { + feed: VecDeque<(usize, Change)>, + inner: Rc>, +} + +struct CompioOpsInner { + api: DriverApi, + feed: Cell>>, + streams: Cell>>>>, +} + +impl CompioOps { + pub(crate) fn current() -> Self { + Runtime::with_current(|rt| { + if let Some(s) = rt.get::() { + s + } else { + let mut inner = None; + rt.driver().register_handler(|api| { + let ops = Rc::new(CompioOpsInner { + api, + feed: Cell::new(Some(VecDeque::new())), + streams: Cell::new(Some(Box::new(Slab::new()))), + }); + inner = Some(ops.clone()); + Box::new(CompioOpsBatcher { + inner: ops, + feed: VecDeque::new(), + }) + }); + + let s = CompioOps(inner.unwrap()); + rt.insert(s.clone()); + s + } + }) + } + + pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl { + let item = TcpStreamItem { + context, + fd: io.as_raw_fd(), + io: Some(io), + flags: Flags::empty(), + ref_count: 1, + }; + self.with(|streams| { + let id = streams.insert(item); + StreamCtl { + id, + inner: self.0.clone(), + } + }) + } + + fn with(&self, f: F) -> R + where + F: FnOnce(&mut Slab>) -> R, + { + let mut inner = self.0.streams.take().unwrap(); + let result = f(&mut inner); + self.0.streams.set(Some(inner)); + result + } +} + +impl Clone for CompioOps { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl Handler for CompioOpsBatcher { + fn readable(&mut self, id: usize) { + log::debug!("FD is readable {:?}", id); + self.feed.push_back((id, Change::Readable)); + } + + fn writable(&mut self, id: usize) { + log::debug!("FD is writable {:?}", id); + self.feed.push_back((id, Change::Writable)); + } + + fn error(&mut self, id: usize, err: io::Error) { + log::debug!("FD is failed {:?}, err: {:?}", id, err); + self.feed.push_back((id, Change::Error(err))); + } + + fn commit(&mut self) { + if self.feed.is_empty() { + return; + } + log::debug!("Commit changes, num: {:?}", self.feed.len()); + + let mut streams = self.inner.streams.take().unwrap(); + + for (id, change) in self.feed.drain(..) { + match change { + Change::Readable => { + let item = &mut streams[id]; + let result = item.context.with_read_buf(|buf| { + let chunk = buf.chunk_mut(); + let b = chunk.as_mut_ptr(); + Poll::Ready( + task::ready!(syscall!( + break libc::read(item.fd, b as _, chunk.len()) + )) + .inspect(|size| { + unsafe { buf.advance_mut(*size) }; + log::debug!( + "FD: {:?}, SIZE: {:?}, BUF: {:?}", + item.fd, + size, + buf + ); + }), + ) + }); + + if result.is_pending() { + item.flags.insert(Flags::RD); + self.inner.api.register(item.fd, id, Interest::Readable); + } + } + Change::Writable => { + let item = &mut streams[id]; + let result = item.context.with_write_buf(|buf| { + let slice = &buf[..]; + syscall!( + break libc::write(item.fd, slice.as_ptr() as _, slice.len()) + ) + }); + + if result.is_pending() { + item.flags.insert(Flags::WR); + self.inner.api.register(item.fd, id, Interest::Writable); + } + } + Change::Error(err) => { + if let Some(item) = streams.get_mut(id) { + item.context.stopped(Some(err)); + if !item.flags.contains(Flags::ERROR) { + item.flags.insert(Flags::ERROR); + item.flags.remove(Flags::RD | Flags::WR); + self.inner.api.unregister_all(item.fd); + } + } + } + } + } + + // extra + let mut feed = self.inner.feed.take().unwrap(); + for id in feed.drain(..) { + log::debug!("Drop io ({}), {:?}", id, streams[id].fd); + + streams[id].ref_count -= 1; + if streams[id].ref_count == 0 { + let item = streams.remove(id); + if item.io.is_some() { + self.inner.api.unregister_all(item.fd); + } + } + } + + self.inner.feed.set(Some(feed)); + self.inner.streams.set(Some(streams)); + } +} + +pub(crate) trait Closable { + async fn close(self) -> io::Result<()>; +} + +impl StreamCtl { + pub(crate) async fn close(self) -> io::Result<()> + where + T: Closable, + { + if let Some(io) = self.with(|streams| streams[self.id].io.take()) { + io.close().await + } else { + Ok(()) + } + } + + pub(crate) fn with_io(&self, f: F) -> R + where + F: FnOnce(Option<&T>) -> R, + { + self.with(|streams| f(streams[self.id].io.as_ref())) + } + + pub(crate) fn pause_all(&self) { + self.with(|streams| { + let item = &mut streams[self.id]; + + if item.flags.intersects(Flags::RD | Flags::WR) { + log::debug!("Pause all io ({}), {:?}", self.id, item.fd); + item.flags.remove(Flags::RD | Flags::WR); + self.inner.api.unregister_all(item.fd); + } + }) + } + + pub(crate) fn pause_read(&self) { + self.with(|streams| { + let item = &mut streams[self.id]; + + log::debug!("Pause io read ({}), {:?}", self.id, item.fd); + if item.flags.contains(Flags::RD) { + item.flags.remove(Flags::RD); + self.inner.api.unregister(item.fd, Interest::Readable); + } + }) + } + + pub(crate) fn resume_read(&self) { + self.with(|streams| { + let item = &mut streams[self.id]; + + log::debug!("Resume io read ({}), {:?}", self.id, item.fd); + if !item.flags.contains(Flags::RD) { + item.flags.insert(Flags::RD); + self.inner + .api + .register(item.fd, self.id, Interest::Readable); + } + }) + } + + pub(crate) fn resume_write(&self) { + self.with(|streams| { + let item = &mut streams[self.id]; + + if !item.flags.contains(Flags::WR) { + log::debug!("Resume io write ({}), {:?}", self.id, item.fd); + let result = item.context.with_write_buf(|buf| { + log::debug!("Writing io ({}), buf: {:?}", self.id, buf.len()); + + let slice = &buf[..]; + syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len())) + }); + + if result.is_pending() { + log::debug!( + "Write is pending ({}), {:?}", + self.id, + item.context.flags() + ); + + item.flags.insert(Flags::WR); + self.inner + .api + .register(item.fd, self.id, Interest::Writable); + } + } + }) + } + + fn with(&self, f: F) -> R + where + F: FnOnce(&mut Slab>) -> R, + { + let mut inner = self.inner.streams.take().unwrap(); + let result = f(&mut inner); + self.inner.streams.set(Some(inner)); + result + } +} + +impl Clone for StreamCtl { + fn clone(&self) -> Self { + self.with(|streams| { + streams[self.id].ref_count += 1; + Self { + id: self.id, + inner: self.inner.clone(), + } + }) + } +} + +impl Drop for StreamCtl { + fn drop(&mut self) { + if let Some(mut streams) = self.inner.streams.take() { + log::debug!("Drop io ({}), {:?}", self.id, streams[self.id].fd); + + streams[self.id].ref_count -= 1; + if streams[self.id].ref_count == 0 { + let item = streams.remove(self.id); + if item.io.is_some() { + self.inner.api.unregister_all(item.fd); + } + } + self.inner.streams.set(Some(streams)); + } else { + let mut feed = self.inner.feed.take().unwrap(); + feed.push_back(self.id); + self.inner.feed.set(Some(feed)); + } + } +} + +impl PartialEq for StreamCtl { + #[inline] + fn eq(&self, other: &StreamCtl) -> bool { + self.id == other.id && ptr::eq(&self.inner, &other.inner) + } +} + +impl fmt::Debug for StreamCtl { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.with(|streams| { + f.debug_struct("StreamCtl") + .field("id", &self.id) + .field("io", &streams[self.id].io) + .finish() + }) + } +} diff --git a/ntex-net/src/rt/io.rs b/ntex-net/src/rt/io.rs new file mode 100644 index 00000000..75b8a003 --- /dev/null +++ b/ntex-net/src/rt/io.rs @@ -0,0 +1,107 @@ +use std::{any, future::poll_fn, io, task::Poll}; + +use ntex_io::{ + types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, +}; +use ntex_runtime::{net::TcpStream, net::UnixStream, spawn}; + +use super::driver::{Closable, CompioOps, StreamCtl}; + +impl IoStream for super::TcpStream { + fn start(self, read: ReadContext, _: WriteContext) -> Option> { + let io = self.0; + let context = read.context(); + let ctl = CompioOps::current().register(io, context.clone()); + let ctl2 = ctl.clone(); + spawn(async move { run(ctl, context).await }).detach(); + + Some(Box::new(HandleWrapper(ctl2))) + } +} + +impl IoStream for super::UnixStream { + fn start(self, read: ReadContext, _: WriteContext) -> Option> { + let io = self.0; + let context = read.context(); + let ctl = CompioOps::current().register(io, context.clone()); + spawn(async move { run(ctl, context).await }).detach(); + + None + } +} + +struct HandleWrapper(StreamCtl); + +impl Handle for HandleWrapper { + fn query(&self, id: any::TypeId) -> Option> { + if id == any::TypeId::of::() { + let addr = self.0.with_io(|io| io.and_then(|io| io.peer_addr().ok())); + if let Some(addr) = addr { + return Some(Box::new(types::PeerAddr(addr))); + } + } + None + } +} + +impl Closable for TcpStream { + async fn close(self) -> io::Result<()> { + TcpStream::close(self).await + } +} + +impl Closable for UnixStream { + async fn close(self) -> io::Result<()> { + UnixStream::close(self).await + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +enum Status { + Shutdown, + Terminate, +} + +async fn run(ctl: StreamCtl, context: IoContext) { + // Handle io read readiness + let st = poll_fn(|cx| { + let read = match context.poll_read_ready(cx) { + Poll::Ready(ReadStatus::Ready) => { + ctl.resume_read(); + Poll::Pending + } + Poll::Ready(ReadStatus::Terminate) => Poll::Ready(()), + Poll::Pending => { + ctl.pause_read(); + Poll::Pending + } + }; + + let write = match context.poll_write_ready(cx) { + Poll::Ready(WriteStatus::Ready) => { + ctl.resume_write(); + Poll::Pending + } + Poll::Ready(WriteStatus::Shutdown) => Poll::Ready(Status::Shutdown), + Poll::Ready(WriteStatus::Terminate) => Poll::Ready(Status::Terminate), + Poll::Pending => Poll::Pending, + }; + + if read.is_pending() && write.is_pending() { + Poll::Pending + } else if write.is_ready() { + write + } else { + Poll::Ready(Status::Terminate) + } + }) + .await; + + ctl.resume_write(); + context.shutdown(st == Status::Shutdown).await; + + ctl.pause_all(); + let result = ctl.close().await; + + context.stopped(result.err()); +} diff --git a/ntex-net/src/rt/mod.rs b/ntex-net/src/rt/mod.rs new file mode 100644 index 00000000..945ad604 --- /dev/null +++ b/ntex-net/src/rt/mod.rs @@ -0,0 +1,60 @@ +#![allow(clippy::type_complexity)] +use std::{io::Result, net, net::SocketAddr}; + +use ntex_bytes::PoolRef; +use ntex_io::Io; + +mod connect; +mod driver; +mod io; + +/// Tcp stream wrapper for compio TcpStream +struct TcpStream(ntex_runtime::net::TcpStream); + +/// Tcp stream wrapper for compio UnixStream +struct UnixStream(ntex_runtime::net::UnixStream); + +/// Opens a TCP connection to a remote host. +pub async fn tcp_connect(addr: SocketAddr) -> Result { + let sock = connect::connect(addr).await?; + Ok(Io::new(TcpStream(sock))) +} + +/// Opens a TCP connection to a remote host and use specified memory pool. +pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result { + let sock = connect::connect(addr).await?; + Ok(Io::with_memory_pool(TcpStream(sock), pool)) +} + +/// Opens a unix stream connection. +pub async fn unix_connect<'a, P>(addr: P) -> Result +where + P: AsRef + 'a, +{ + let sock = connect::connect_unix(addr).await?; + Ok(Io::new(UnixStream(sock))) +} + +/// Opens a unix stream connection and specified memory pool. +pub async fn unix_connect_in<'a, P>(addr: P, pool: PoolRef) -> Result +where + P: AsRef + 'a, +{ + let sock = connect::connect_unix(addr).await?; + Ok(Io::with_memory_pool(UnixStream(sock), pool)) +} + +/// Convert std TcpStream to tokio's TcpStream +pub fn from_tcp_stream(stream: net::TcpStream) -> Result { + stream.set_nodelay(true)?; + Ok(Io::new(TcpStream(ntex_runtime::net::TcpStream::from_std( + stream, + )?))) +} + +/// Convert std UnixStream to tokio's UnixStream +pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { + Ok(Io::new(UnixStream( + ntex_runtime::net::UnixStream::from_std(stream)?, + ))) +} diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index 92b15284..752916a8 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-rt" -version = "0.4.24" +version = "0.4.25" authors = ["ntex contributors "] description = "ntex runtime" keywords = ["network", "framework", "async", "futures"] @@ -29,6 +29,9 @@ tokio = ["tok-io"] # compio support compio = ["compio-driver", "compio-runtime"] +# default ntex runtime +default-rt = ["ntex-runtime", "ntex-iodriver"] + # async-std support async-std = ["async_std/unstable"] @@ -46,6 +49,9 @@ tok-io = { version = "1", package = "tokio", default-features = false, features "net", ], optional = true } +ntex-runtime = { version = "0.1", optional = true } +ntex-iodriver = { version = "0.1", optional = true } + [target.'cfg(target_os = "linux")'.dependencies] glomm-io = { version = "0.9", package = "glommio", optional = true } futures-channel = { version = "0.3", optional = true } diff --git a/ntex-rt/build.rs b/ntex-rt/build.rs index 7f7fe5a4..54b839a9 100644 --- a/ntex-rt/build.rs +++ b/ntex-rt/build.rs @@ -9,6 +9,7 @@ fn main() { "CARGO_FEATURE_TOKIO" => features.insert("tokio"), "CARGO_FEATURE_GLOMMIO" => features.insert("glommio"), "CARGO_FEATURE_ASYNC_STD" => features.insert("async-std"), + "CARGO_FEATURE_DEFAULT_RT" => features.insert("default-rt"), _ => false, }; } diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index f0e2b457..b2fc2e3c 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -247,6 +247,161 @@ mod compio { } } +#[allow(dead_code)] +#[cfg(feature = "default-rt")] +mod default_rt { + use std::task::{ready, Context, Poll}; + use std::{fmt, future::poll_fn, future::Future, pin::Pin}; + + use ntex_runtime::Runtime; + + /// Runs the provided future, blocking the current thread until the future + /// completes. + pub fn block_on>(fut: F) { + log::info!( + "Starting compio runtime, driver {:?}", + ntex_iodriver::DriverType::current() + ); + let rt = Runtime::new().unwrap(); + rt.block_on(fut); + } + + /// Spawns a blocking task. + /// + /// The task will be spawned onto a thread pool specifically dedicated + /// to blocking tasks. This is useful to prevent long-running synchronous + /// operations from blocking the main futures executor. + pub fn spawn_blocking(f: F) -> JoinHandle + where + F: FnOnce() -> T + Send + Sync + 'static, + T: Send + 'static, + { + JoinHandle { + fut: Some(ntex_runtime::spawn_blocking(f)), + } + } + + /// Spawn a future on the current thread. This does not create a new Arbiter + /// or Arbiter address, it is simply a helper for spawning futures on the current + /// thread. + /// + /// # Panics + /// + /// This function panics if ntex system is not running. + #[inline] + pub fn spawn(f: F) -> Task + where + F: Future + 'static, + { + let ptr = crate::CB.with(|cb| (cb.borrow().0)()); + let task = ntex_runtime::spawn(async move { + if let Some(ptr) = ptr { + let mut f = std::pin::pin!(f); + let result = poll_fn(|ctx| { + let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr)); + let result = f.as_mut().poll(ctx); + crate::CB.with(|cb| (cb.borrow().2)(new_ptr)); + result + }) + .await; + crate::CB.with(|cb| (cb.borrow().3)(ptr)); + result + } else { + f.await + } + }); + + Task { task: Some(task) } + } + + /// Executes a future on the current thread. This does not create a new Arbiter + /// or Arbiter address, it is simply a helper for executing futures on the current + /// thread. + /// + /// # Panics + /// + /// This function panics if ntex system is not running. + #[inline] + pub fn spawn_fn(f: F) -> Task + where + F: FnOnce() -> R + 'static, + R: Future + 'static, + { + spawn(async move { f().await }) + } + + /// A spawned task. + pub struct Task { + task: Option>, + } + + impl Task { + pub fn is_finished(&self) -> bool { + if let Some(hnd) = &self.task { + hnd.is_finished() + } else { + true + } + } + } + + impl Drop for Task { + fn drop(&mut self) { + self.task.take().unwrap().detach(); + } + } + + impl Future for Task { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(self.task.as_mut().unwrap()).poll(cx) + } + } + + #[derive(Debug, Copy, Clone)] + pub struct JoinError; + + impl fmt::Display for JoinError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "JoinError") + } + } + + impl std::error::Error for JoinError {} + + pub struct JoinHandle { + fut: Option>, + } + + impl JoinHandle { + pub fn is_finished(&self) -> bool { + if let Some(hnd) = &self.fut { + hnd.is_finished() + } else { + true + } + } + } + + impl Drop for JoinHandle { + fn drop(&mut self) { + self.fut.take().unwrap().detach(); + } + } + + impl Future for JoinHandle { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready( + ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx)) + .map_err(|_| JoinError), + ) + } + } +} + #[allow(dead_code)] #[cfg(feature = "async-std")] mod asyncstd { @@ -473,11 +628,15 @@ pub use self::glommio::*; #[cfg(feature = "compio")] pub use self::compio::*; +#[cfg(feature = "default-rt")] +pub use self::default_rt::*; + #[allow(dead_code)] #[cfg(all( not(feature = "tokio"), not(feature = "async-std"), not(feature = "compio"), + not(feature = "default-rt"), not(feature = "glommio") ))] mod no_rt { @@ -542,6 +701,7 @@ mod no_rt { not(feature = "tokio"), not(feature = "async-std"), not(feature = "compio"), + not(feature = "default-rt"), not(feature = "glommio") ))] pub use self::no_rt::*; diff --git a/ntex-runtime/Cargo.toml b/ntex-runtime/Cargo.toml new file mode 100644 index 00000000..a9b5c32d --- /dev/null +++ b/ntex-runtime/Cargo.toml @@ -0,0 +1,58 @@ +[package] +name = "ntex-runtime" +version = "0.1.0" +description = "Async runtime for ntex" +categories = ["asynchronous"] +keywords = ["async", "runtime"] +edition = { workspace = true } +authors = { workspace = true } +readme = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[package.metadata.docs.rs] +all-features = true +default-target = "x86_64-unknown-linux-gnu" +rustdoc-args = ["--cfg", "docsrs"] +targets = [ + "x86_64-pc-windows-gnu", + "x86_64-unknown-linux-gnu", + "x86_64-apple-darwin", + "aarch64-apple-ios", + "aarch64-linux-android", + "x86_64-unknown-dragonfly", + "x86_64-unknown-freebsd", + "x86_64-unknown-illumos", + "x86_64-unknown-netbsd", + "x86_64-unknown-openbsd", +] + +[dependencies] +ntex-iodriver = "0.1" + +async-task = { workspace = true } +cfg-if = { workspace = true } +crossbeam-queue = { workspace = true } +scoped-tls = { workspace = true } +fxhash = { workspace = true } +log = { workspace = true } +socket2 = { workspace = true, features = ["all"] } + +# Windows specific dependencies +[target.'cfg(windows)'.dependencies] +windows-sys = { workspace = true, features = [ + "Win32_Foundation", + "Win32_Networking_WinSock", + "Win32_System_IO", +] } + +# Unix specific dependencies +[target.'cfg(unix)'.dependencies] +libc = { workspace = true } + +[target.'cfg(windows)'.dev-dependencies] +windows-sys = { workspace = true, features = ["Win32_UI_WindowsAndMessaging"] } + +[features] +io-uring = ["ntex-iodriver/io-uring"] +polling = ["ntex-iodriver/polling"] diff --git a/ntex-runtime/src/lib.rs b/ntex-runtime/src/lib.rs new file mode 100644 index 00000000..d706fedb --- /dev/null +++ b/ntex-runtime/src/lib.rs @@ -0,0 +1,13 @@ +//! The async runtime for ntex. + +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +// #![warn(missing_docs)] + +pub mod net; +mod op; +mod rt; + +pub use async_task::Task; +pub use rt::{ + spawn, spawn_blocking, submit, submit_with_flags, JoinHandle, Runtime, RuntimeBuilder, +}; diff --git a/ntex-runtime/src/net/mod.rs b/ntex-runtime/src/net/mod.rs new file mode 100644 index 00000000..fc0d3179 --- /dev/null +++ b/ntex-runtime/src/net/mod.rs @@ -0,0 +1,11 @@ +//! Network related. +//! +//! Currently, TCP/UDP/Unix socket are implemented. + +mod socket; +mod tcp; +mod unix; + +pub use socket::*; +pub use tcp::*; +pub use unix::*; diff --git a/ntex-runtime/src/net/socket.rs b/ntex-runtime/src/net/socket.rs new file mode 100644 index 00000000..ce29074a --- /dev/null +++ b/ntex-runtime/src/net/socket.rs @@ -0,0 +1,226 @@ +#![allow(clippy::missing_safety_doc)] +use std::{future::Future, io, mem, mem::MaybeUninit}; + +use ntex_iodriver::{impl_raw_fd, op::CloseSocket, op::ShutdownSocket, syscall, AsRawFd}; +use socket2::{Domain, Protocol, SockAddr, Socket as Socket2, Type}; + +#[derive(Debug)] +pub struct Socket { + socket: Socket2, +} + +impl Socket { + pub fn from_socket2(socket: Socket2) -> io::Result { + #[cfg(unix)] + { + #[cfg(not(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "fuchsia", + target_os = "hurd", + target_os = "illumos", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + target_os = "espidf", + target_os = "vita", + )))] + socket.set_cloexec(true)?; + #[cfg(any( + target_os = "ios", + target_os = "macos", + target_os = "tvos", + target_os = "watchos", + ))] + socket.set_nosigpipe(true)?; + + // On Linux we use blocking socket + // Newer kernels have the patch that allows to arm io_uring poll mechanism for + // non blocking socket when there is no connections in listen queue + // + // https://patchwork.kernel.org/project/linux-block/patch/f999615b-205c-49b7-b272-c4e42e45e09d@kernel.dk/#22949861 + if cfg!(not(all(target_os = "linux", feature = "io-uring"))) + || ntex_iodriver::DriverType::is_polling() + { + socket.set_nonblocking(true)?; + } + } + + Ok(Self { socket }) + } + + pub fn peer_addr(&self) -> io::Result { + self.socket.peer_addr() + } + + pub fn local_addr(&self) -> io::Result { + self.socket.local_addr() + } + + #[cfg(windows)] + pub async fn new( + domain: Domain, + ty: Type, + protocol: Option, + ) -> io::Result { + use std::panic::resume_unwind; + + let socket = crate::spawn_blocking(move || Socket2::new(domain, ty, protocol)) + .await + .unwrap_or_else(|e| resume_unwind(e))?; + Self::from_socket2(socket) + } + + #[cfg(unix)] + pub async fn new( + domain: Domain, + ty: Type, + protocol: Option, + ) -> io::Result { + use std::os::fd::FromRawFd; + + #[allow(unused_mut)] + let mut ty: i32 = ty.into(); + #[cfg(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "fuchsia", + target_os = "hurd", + target_os = "illumos", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + ))] + { + ty |= libc::SOCK_CLOEXEC; + } + + let op = ntex_iodriver::op::CreateSocket::new( + domain.into(), + ty, + protocol.map(|p| p.into()).unwrap_or_default(), + ); + let (res, _) = crate::submit(op).await; + let socket = unsafe { Socket2::from_raw_fd(res? as _) }; + + Self::from_socket2(socket) + } + + pub async fn bind( + addr: &SockAddr, + ty: Type, + protocol: Option, + ) -> io::Result { + let socket = Self::new(addr.domain(), ty, protocol).await?; + socket.socket.bind(addr)?; + Ok(socket) + } + + pub fn listen(&self, backlog: i32) -> io::Result<()> { + self.socket.listen(backlog) + } + + pub fn close(self) -> impl Future> { + let op = CloseSocket::from_raw_fd(self.as_raw_fd()); + let fut = crate::submit(op); + mem::forget(self); + async move { + fut.await.0?; + Ok(()) + } + } + + pub async fn shutdown(&self) -> io::Result<()> { + let op = ShutdownSocket::new(self.as_raw_fd(), std::net::Shutdown::Write); + crate::submit(op).await.0?; + Ok(()) + } + + #[cfg(unix)] + pub unsafe fn get_socket_option( + &self, + level: i32, + name: i32, + ) -> io::Result { + let mut value: MaybeUninit = MaybeUninit::uninit(); + let mut len = size_of::() as libc::socklen_t; + syscall!(libc::getsockopt( + self.socket.as_raw_fd(), + level, + name, + value.as_mut_ptr() as _, + &mut len + )) + .map(|_| { + debug_assert_eq!(len as usize, size_of::()); + // SAFETY: The value is initialized by `getsockopt`. + value.assume_init() + }) + } + + #[cfg(windows)] + pub unsafe fn get_socket_option( + &self, + level: i32, + name: i32, + ) -> io::Result { + let mut value: MaybeUninit = MaybeUninit::uninit(); + let mut len = size_of::() as i32; + syscall!( + SOCKET, + windows_sys::Win32::Networking::WinSock::getsockopt( + self.socket.as_raw_fd() as _, + level, + name, + value.as_mut_ptr() as _, + &mut len + ) + ) + .map(|_| { + debug_assert_eq!(len as usize, size_of::()); + // SAFETY: The value is initialized by `getsockopt`. + value.assume_init() + }) + } + + #[cfg(unix)] + pub unsafe fn set_socket_option( + &self, + level: i32, + name: i32, + value: &T, + ) -> io::Result<()> { + syscall!(libc::setsockopt( + self.socket.as_raw_fd(), + level, + name, + value as *const _ as _, + std::mem::size_of::() as _ + )) + .map(|_| ()) + } + + #[cfg(windows)] + pub unsafe fn set_socket_option( + &self, + level: i32, + name: i32, + value: &T, + ) -> io::Result<()> { + syscall!( + SOCKET, + windows_sys::Win32::Networking::WinSock::setsockopt( + self.socket.as_raw_fd() as _, + level, + name, + value as *const _ as _, + std::mem::size_of::() as _ + ) + ) + .map(|_| ()) + } +} + +impl_raw_fd!(Socket, Socket2, socket, socket); diff --git a/ntex-runtime/src/net/tcp.rs b/ntex-runtime/src/net/tcp.rs new file mode 100644 index 00000000..6eff0d15 --- /dev/null +++ b/ntex-runtime/src/net/tcp.rs @@ -0,0 +1,50 @@ +use std::{future::Future, io, net::SocketAddr}; + +use ntex_iodriver::impl_raw_fd; +use socket2::Socket as Socket2; + +use crate::net::Socket; + +/// A TCP stream between a local and a remote socket. +/// +/// A TCP stream can either be created by connecting to an endpoint, via the +/// `connect` method, or by accepting a connection from a listener. +#[derive(Debug)] +pub struct TcpStream { + inner: Socket, +} + +impl TcpStream { + /// Creates new TcpStream from a std::net::TcpStream. + pub fn from_std(stream: std::net::TcpStream) -> io::Result { + Ok(Self { + inner: Socket::from_socket2(Socket2::from(stream))?, + }) + } + + /// Creates new TcpStream from a std::net::TcpStream. + pub fn from_socket(inner: Socket) -> Self { + Self { inner } + } + + /// Close the socket. + pub fn close(self) -> impl Future> { + self.inner.close() + } + + /// Returns the socket address of the remote peer of this TCP connection. + pub fn peer_addr(&self) -> io::Result { + self.inner + .peer_addr() + .map(|addr| addr.as_socket().expect("should be SocketAddr")) + } + + /// Returns the socket address of the local half of this TCP connection. + pub fn local_addr(&self) -> io::Result { + self.inner + .local_addr() + .map(|addr| addr.as_socket().expect("should be SocketAddr")) + } +} + +impl_raw_fd!(TcpStream, socket2::Socket, inner, socket); diff --git a/ntex-runtime/src/net/unix.rs b/ntex-runtime/src/net/unix.rs new file mode 100644 index 00000000..8bd00a56 --- /dev/null +++ b/ntex-runtime/src/net/unix.rs @@ -0,0 +1,98 @@ +use std::{future::Future, io}; + +use ntex_iodriver::impl_raw_fd; +use socket2::{SockAddr, Socket as Socket2}; + +use crate::net::Socket; + +/// A Unix stream between two local sockets on Windows & WSL. +/// +/// A Unix stream can either be created by connecting to an endpoint, via the +/// `connect` method. +#[derive(Debug)] +pub struct UnixStream { + inner: Socket, +} + +impl UnixStream { + #[cfg(unix)] + /// Creates new UnixStream from a std::os::unix::net::UnixStream. + pub fn from_std(stream: std::os::unix::net::UnixStream) -> io::Result { + Ok(Self { + inner: Socket::from_socket2(Socket2::from(stream))?, + }) + } + + /// Creates new TcpStream from a Socket. + pub fn from_socket(inner: Socket) -> Self { + Self { inner } + } + + /// Close the socket. If the returned future is dropped before polling, the + /// socket won't be closed. + pub fn close(self) -> impl Future> { + self.inner.close() + } + + /// Returns the socket path of the remote peer of this connection. + pub fn peer_addr(&self) -> io::Result { + #[allow(unused_mut)] + let mut addr = self.inner.peer_addr()?; + #[cfg(windows)] + { + fix_unix_socket_length(&mut addr); + } + Ok(addr) + } + + /// Returns the socket path of the local half of this connection. + pub fn local_addr(&self) -> io::Result { + self.inner.local_addr() + } +} + +impl_raw_fd!(UnixStream, socket2::Socket, inner, socket); + +#[cfg(windows)] +#[inline] +fn empty_unix_socket() -> SockAddr { + use windows_sys::Win32::Networking::WinSock::{AF_UNIX, SOCKADDR_UN}; + + // SAFETY: the length is correct + unsafe { + SockAddr::try_init(|addr, len| { + let addr: *mut SOCKADDR_UN = addr.cast(); + std::ptr::write( + addr, + SOCKADDR_UN { + sun_family: AF_UNIX, + sun_path: [0; 108], + }, + ); + std::ptr::write(len, 3); + Ok(()) + }) + } + // it is always Ok + .unwrap() + .1 +} + +// The peer addr returned after ConnectEx is buggy. It contains bytes that +// should not belong to the address. Luckily a unix path should not contain `\0` +// until the end. We can determine the path ending by that. +#[cfg(windows)] +#[inline] +fn fix_unix_socket_length(addr: &mut SockAddr) { + use windows_sys::Win32::Networking::WinSock::SOCKADDR_UN; + + // SAFETY: cannot construct non-unix socket address in safe way. + let unix_addr: &SOCKADDR_UN = unsafe { &*addr.as_ptr().cast() }; + let addr_len = match std::ffi::CStr::from_bytes_until_nul(&unix_addr.sun_path) { + Ok(str) => str.to_bytes_with_nul().len() + 2, + Err(_) => std::mem::size_of::(), + }; + unsafe { + addr.set_length(addr_len as _); + } +} diff --git a/ntex-runtime/src/op.rs b/ntex-runtime/src/op.rs new file mode 100644 index 00000000..99817179 --- /dev/null +++ b/ntex-runtime/src/op.rs @@ -0,0 +1,39 @@ +use std::{future::Future, io, pin::Pin, task::Context, task::Poll}; + +use ntex_iodriver::{Key, OpCode, PushEntry}; + +use crate::rt::Runtime; + +#[derive(Debug)] +pub(crate) struct OpFuture { + key: Option>, +} + +impl OpFuture { + pub(crate) fn new(key: Key) -> Self { + Self { key: Some(key) } + } +} + +impl Future for OpFuture { + type Output = ((io::Result, T), u32); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = Runtime::with_current(|r| r.poll_task(cx, self.key.take().unwrap())); + match res { + PushEntry::Pending(key) => { + self.key = Some(key); + Poll::Pending + } + PushEntry::Ready(res) => Poll::Ready(res), + } + } +} + +impl Drop for OpFuture { + fn drop(&mut self) { + if let Some(key) = self.key.take() { + Runtime::with_current(|r| r.cancel_op(key)); + } + } +} diff --git a/ntex-runtime/src/rt.rs b/ntex-runtime/src/rt.rs new file mode 100644 index 00000000..f36fd01e --- /dev/null +++ b/ntex-runtime/src/rt.rs @@ -0,0 +1,431 @@ +#![allow(clippy::type_complexity)] +use std::any::{Any, TypeId}; +use std::collections::{HashMap, VecDeque}; +use std::{ + cell::Cell, cell::RefCell, future::Future, io, sync::Arc, task::Context, thread, + time::Duration, +}; + +use async_task::{Runnable, Task}; +use crossbeam_queue::SegQueue; +use ntex_iodriver::{ + op::Asyncify, AsRawFd, Key, NotifyHandle, OpCode, Proactor, ProactorBuilder, PushEntry, + RawFd, +}; + +use crate::op::OpFuture; + +scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime); + +/// Type alias for `Task>>`, which resolves to an +/// `Err` when the spawned future panicked. +pub type JoinHandle = Task>>; + +pub struct RemoteHandle { + handle: NotifyHandle, + runnables: Arc, +} + +impl RemoteHandle { + /// Wake up runtime + pub fn notify(&self) { + self.handle.notify().ok(); + } + + /// Spawns a new asynchronous task, returning a [`Task`] for it. + /// + /// Spawning a task enables the task to execute concurrently to other tasks. + /// There is no guarantee that a spawned task will execute to completion. + pub fn spawn(&self, future: F) -> Task { + let runnables = self.runnables.clone(); + let handle = self.handle.clone(); + let schedule = move |runnable| { + runnables.schedule(runnable, &handle); + }; + let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; + runnable.schedule(); + task + } +} + +/// The async runtime for ntex. It is a thread local runtime, and cannot be +/// sent to other threads. +pub struct Runtime { + driver: Proactor, + runnables: Arc, + event_interval: usize, + data: RefCell, fxhash::FxBuildHasher>>, +} + +impl Runtime { + /// Create [`Runtime`] with default config. + pub fn new() -> io::Result { + Self::builder().build() + } + + /// Create a builder for [`Runtime`]. + pub fn builder() -> RuntimeBuilder { + RuntimeBuilder::new() + } + + #[allow(clippy::arc_with_non_send_sync)] + fn with_builder(builder: &RuntimeBuilder) -> io::Result { + Ok(Self { + driver: builder.proactor_builder.build()?, + runnables: Arc::new(RunnableQueue::new()), + event_interval: builder.event_interval, + data: RefCell::new(HashMap::default()), + }) + } + + /// Perform a function on the current runtime. + /// + /// ## Panics + /// + /// This method will panic if there are no running [`Runtime`]. + pub fn with_current T>(f: F) -> T { + #[cold] + fn not_in_ntex_runtime() -> ! { + panic!("not in a ntex runtime") + } + + if CURRENT_RUNTIME.is_set() { + CURRENT_RUNTIME.with(f) + } else { + not_in_ntex_runtime() + } + } + + /// Get current driver + pub fn driver(&self) -> &Proactor { + &self.driver + } + + /// Get handle for current runtime + pub fn handle(&self) -> RemoteHandle { + RemoteHandle { + handle: self.driver.handle(), + runnables: self.runnables.clone(), + } + } + + /// Attach a raw file descriptor/handle/socket to the runtime. + /// + /// You only need this when authoring your own high-level APIs. High-level + /// resources in this crate are attached automatically. + pub fn attach(&self, fd: RawFd) -> io::Result<()> { + self.driver.attach(fd) + } + + /// Block on the future till it completes. + pub fn block_on(&self, future: F) -> F::Output { + CURRENT_RUNTIME.set(self, || { + let mut result = None; + unsafe { self.spawn_unchecked(async { result = Some(future.await) }) }.detach(); + + self.runnables.run(self.event_interval); + loop { + if let Some(result) = result.take() { + return result; + } + + self.poll_with_driver(self.runnables.has_tasks(), || { + self.runnables.run(self.event_interval); + }); + } + }) + } + + /// Spawns a new asynchronous task, returning a [`Task`] for it. + /// + /// Spawning a task enables the task to execute concurrently to other tasks. + /// There is no guarantee that a spawned task will execute to completion. + pub fn spawn(&self, future: F) -> Task { + unsafe { self.spawn_unchecked(future) } + } + + /// Spawns a new asynchronous task, returning a [`Task`] for it. + /// + /// # Safety + /// + /// The caller should ensure the captured lifetime is long enough. + pub unsafe fn spawn_unchecked(&self, future: F) -> Task { + let runnables = self.runnables.clone(); + let handle = self.driver.handle(); + let schedule = move |runnable| { + runnables.schedule(runnable, &handle); + }; + let (runnable, task) = async_task::spawn_unchecked(future, schedule); + runnable.schedule(); + task + } + + /// Spawns a blocking task in a new thread, and wait for it. + /// + /// The task will not be cancelled even if the future is dropped. + pub fn spawn_blocking(&self, f: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let op = Asyncify::new(move || { + let res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)); + (Ok(0), res) + }); + // It is safe to use `submit` here because the task is spawned immediately. + unsafe { + let fut = self.submit_with_flags(op); + self.spawn_unchecked(async move { fut.await.0 .1.into_inner() }) + } + } + + fn submit_raw( + &self, + op: T, + ) -> PushEntry, (io::Result, T)> { + self.driver.push(op) + } + + fn submit_with_flags( + &self, + op: T, + ) -> impl Future, T), u32)> { + let fut = self.submit_raw(op); + + async move { + match fut { + PushEntry::Pending(user_data) => OpFuture::new(user_data).await, + PushEntry::Ready(res) => { + // submit_flags won't be ready immediately, if ready, it must be error without + // flags + (res, 0) + } + } + } + } + + pub(crate) fn cancel_op(&self, op: Key) { + self.driver.cancel(op); + } + + pub(crate) fn poll_task( + &self, + cx: &mut Context, + op: Key, + ) -> PushEntry, ((io::Result, T), u32)> { + self.driver.pop(op).map_pending(|mut k| { + self.driver.update_waker(&mut k, cx.waker().clone()); + k + }) + } + + fn poll_with_driver(&self, has_tasks: bool, f: F) { + let timeout = if has_tasks { + Some(Duration::ZERO) + } else { + None + }; + + if let Err(e) = self.driver.poll(timeout, f) { + match e.kind() { + io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => { + log::debug!("expected error: {e}"); + } + _ => panic!("{e:?}"), + } + } + } + + /// Insert a type into this runtime. + pub fn insert(&self, val: T) { + self.data + .borrow_mut() + .insert(TypeId::of::(), Box::new(val)); + } + + /// Check if container contains entry + pub fn contains(&self) -> bool { + self.data.borrow().contains_key(&TypeId::of::()) + } + + /// Get a reference to a type previously inserted on this runtime. + pub fn get(&self) -> Option + where + T: Clone + 'static, + { + self.data + .borrow() + .get(&TypeId::of::()) + .and_then(|boxed| boxed.downcast_ref().cloned()) + } +} + +impl AsRawFd for Runtime { + fn as_raw_fd(&self) -> RawFd { + self.driver.as_raw_fd() + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + CURRENT_RUNTIME.set(self, || { + while self.runnables.sync_runnables.pop().is_some() {} + loop { + let runnable = self.runnables.local_runnables.borrow_mut().pop_front(); + if runnable.is_none() { + break; + } + } + }) + } +} + +struct RunnableQueue { + id: thread::ThreadId, + idle: Cell, + local_runnables: RefCell>, + sync_runnables: SegQueue, +} + +impl RunnableQueue { + fn new() -> Self { + Self { + id: thread::current().id(), + idle: Cell::new(true), + local_runnables: RefCell::new(VecDeque::new()), + sync_runnables: SegQueue::new(), + } + } + + fn schedule(&self, runnable: Runnable, handle: &NotifyHandle) { + if self.id == thread::current().id() { + self.local_runnables.borrow_mut().push_back(runnable); + if self.idle.get() { + let _ = handle.notify(); + } + } else { + self.sync_runnables.push(runnable); + handle.notify().ok(); + } + } + + fn run(&self, event_interval: usize) { + self.idle.set(false); + for _ in 0..event_interval { + let task = self.local_runnables.borrow_mut().pop_front(); + if let Some(task) = task { + task.run(); + } else { + break; + } + } + + for _ in 0..event_interval { + if !self.sync_runnables.is_empty() { + if let Some(task) = self.sync_runnables.pop() { + task.run(); + continue; + } + } + break; + } + self.idle.set(true); + } + + fn has_tasks(&self) -> bool { + !(self.local_runnables.borrow().is_empty() && self.sync_runnables.is_empty()) + } +} + +/// Builder for [`Runtime`]. +#[derive(Debug, Clone)] +pub struct RuntimeBuilder { + proactor_builder: ProactorBuilder, + event_interval: usize, +} + +impl Default for RuntimeBuilder { + fn default() -> Self { + Self::new() + } +} + +impl RuntimeBuilder { + /// Create the builder with default config. + pub fn new() -> Self { + Self { + proactor_builder: ProactorBuilder::new(), + event_interval: 61, + } + } + + /// Replace proactor builder. + pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self { + self.proactor_builder = builder; + self + } + + /// Sets the number of scheduler ticks after which the scheduler will poll + /// for external events (timers, I/O, and so on). + /// + /// A scheduler “tick” roughly corresponds to one poll invocation on a task. + pub fn event_interval(&mut self, val: usize) -> &mut Self { + self.event_interval = val; + self + } + + /// Build [`Runtime`]. + pub fn build(&self) -> io::Result { + Runtime::with_builder(self) + } +} + +/// Spawns a new asynchronous task, returning a [`Task`] for it. +/// +/// Spawning a task enables the task to execute concurrently to other tasks. +/// There is no guarantee that a spawned task will execute to completion. +/// +/// ## Panics +/// +/// This method doesn't create runtime. It tries to obtain the current runtime +/// by [`Runtime::with_current`]. +pub fn spawn(future: F) -> Task { + Runtime::with_current(|r| r.spawn(future)) +} + +/// Spawns a blocking task in a new thread, and wait for it. +/// +/// The task will not be cancelled even if the future is dropped. +/// +/// ## Panics +/// +/// This method doesn't create runtime. It tries to obtain the current runtime +/// by [`Runtime::with_current`]. +pub fn spawn_blocking( + f: impl (FnOnce() -> T) + Send + 'static, +) -> JoinHandle { + Runtime::with_current(|r| r.spawn_blocking(f)) +} + +/// Submit an operation to the current runtime, and return a future for it. +/// +/// ## Panics +/// +/// This method doesn't create runtime. It tries to obtain the current runtime +/// by [`Runtime::with_current`]. +pub async fn submit(op: T) -> (io::Result, T) { + submit_with_flags(op).await.0 +} + +/// Submit an operation to the current runtime, and return a future for it with +/// flags. +/// +/// ## Panics +/// +/// This method doesn't create runtime. It tries to obtain the current runtime +/// by [`Runtime::with_current`]. +pub async fn submit_with_flags( + op: T, +) -> ((io::Result, T), u32) { + Runtime::with_current(|r| r.submit_with_flags(op)).await +} diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 96affad8..e13d0a14 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "2.11.0" +version = "2.12.0" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -54,6 +54,9 @@ async-std = ["ntex-net/async-std"] # compio runtime compio = ["ntex-net/compio"] +# default ntex runtime +default-rt = ["ntex-net/default-rt"] + # websocket support ws = ["dep:sha-1"] @@ -70,9 +73,9 @@ ntex-util = "2.8" ntex-bytes = "0.1.27" ntex-server = "2.7" ntex-h2 = "1.8.1" -ntex-rt = "0.4.22" -ntex-io = "2.9" -ntex-net = "2.4" +ntex-rt = "0.4.25" +ntex-io = "2.11" +ntex-net = "2.5" ntex-tls = "2.3" base64 = "0.22" diff --git a/ntex/tests/http_awc_client.rs b/ntex/tests/http_awc_client.rs index eb18624b..3c953c58 100644 --- a/ntex/tests/http_awc_client.rs +++ b/ntex/tests/http_awc_client.rs @@ -220,7 +220,7 @@ async fn test_connection_reuse() { ))) }); - let client = Client::build().timeout(Seconds(10)).finish(); + let client = Client::build().timeout(Seconds(30)).finish(); // req 1 let request = client.get(srv.url("/")).send(); @@ -255,7 +255,7 @@ async fn test_connection_force_close() { ))) }); - let client = Client::build().timeout(Seconds(10)).finish(); + let client = Client::build().timeout(Seconds(30)).finish(); // req 1 let request = client.get(srv.url("/")).force_close().send(); @@ -263,7 +263,7 @@ async fn test_connection_force_close() { assert!(response.status().is_success()); // req 2 - let client = Client::build().timeout(Seconds(10)).finish(); + let client = Client::build().timeout(Seconds(30)).finish(); let req = client.post(srv.url("/")).force_close(); let response = req.send().await.unwrap(); assert!(response.status().is_success()); @@ -291,7 +291,7 @@ async fn test_connection_server_close() { ))) }); - let client = Client::build().timeout(Seconds(10)).finish(); + let client = Client::build().timeout(Seconds(30)).finish(); // req 1 let request = client.get(srv.url("/")).send(); @@ -814,7 +814,7 @@ async fn client_read_until_eof() { // client request let req = Client::build() - .timeout(Seconds(5)) + .timeout(Seconds(30)) .finish() .get(format!("http://{}/", addr).as_str()); let mut response = req.send().await.unwrap(); diff --git a/ntex/tests/server.rs b/ntex/tests/server.rs index 09432a99..48a06369 100644 --- a/ntex/tests/server.rs +++ b/ntex/tests/server.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; #[cfg(feature = "tokio")] use std::{io, sync::Arc}; -use std::{io::Read, net, sync::mpsc, thread, time}; +use std::{io::Read, io::Write, net, sync::mpsc, thread, time}; use ntex::codec::BytesCodec; use ntex::io::Io; @@ -71,6 +71,7 @@ async fn test_listen() { #[ntex::test] #[cfg(unix)] +#[allow(clippy::unused_io_amount)] async fn test_run() { let addr = TestServer::unused_addr(); let (tx, rx) = mpsc::channel(); @@ -80,6 +81,7 @@ async fn test_run() { sys.run(move || { let srv = build() .backlog(100) + .workers(1) .disable_signals() .bind("test", addr, move |_| { fn_service(|io: Io| async move { @@ -99,6 +101,7 @@ async fn test_run() { let mut buf = [1u8; 4]; let mut conn = net::TcpStream::connect(addr).unwrap(); + conn.write(&b"test"[..]).unwrap(); let _ = conn.read_exact(&mut buf); assert_eq!(buf, b"test"[..]); diff --git a/ntex/tests/web_server.rs b/ntex/tests/web_server.rs index cbb7956d..b7bf1b75 100644 --- a/ntex/tests/web_server.rs +++ b/ntex/tests/web_server.rs @@ -868,7 +868,7 @@ async fn test_reading_deflate_encoding_large_random_rustls() { // client request let req = srv .post("/") - .timeout(Millis(10_000)) + .timeout(Millis(30_000)) .header(CONTENT_ENCODING, "deflate") .send_stream(TestBody::new(Bytes::from(enc), 1024)); @@ -909,7 +909,7 @@ async fn test_reading_deflate_encoding_large_random_rustls_h1() { // client request let req = srv .post("/") - .timeout(Millis(10_000)) + .timeout(Millis(30_000)) .header(CONTENT_ENCODING, "deflate") .send_stream(TestBody::new(Bytes::from(enc), 1024)); @@ -950,7 +950,7 @@ async fn test_reading_deflate_encoding_large_random_rustls_h2() { // client request let req = srv .post("/") - .timeout(Millis(10_000)) + .timeout(Millis(30_000)) .header(CONTENT_ENCODING, "deflate") .send_stream(TestBody::new(Bytes::from(enc), 1024));