diff --git a/Cargo.toml b/Cargo.toml index 7d6709c6..1224f10f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,6 @@ members = [ "ntex-http", "ntex-router", "ntex-rt", - "ntex-neon", "ntex-net", "ntex-server", "ntex-service", @@ -33,7 +32,6 @@ ntex = { path = "ntex" } ntex-bytes = { path = "ntex-bytes" } ntex-codec = { path = "ntex-codec" } ntex-io = { path = "ntex-io" } -ntex-neon = { path = "ntex-neon" } ntex-net = { path = "ntex-net" } ntex-http = { path = "ntex-http" } ntex-router = { path = "ntex-router" } @@ -47,6 +45,8 @@ ntex-util = { path = "ntex-util" } ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } +ntex-neon = { git = "https://github.com/ntex-rs/neon.git" } + [workspace.dependencies] async-task = "4.5.0" bitflags = "2" diff --git a/ntex-neon/Cargo.toml b/ntex-neon/Cargo.toml deleted file mode 100644 index 2c3ff199..00000000 --- a/ntex-neon/Cargo.toml +++ /dev/null @@ -1,81 +0,0 @@ -[package] -name = "ntex-neon" -version = "0.1.0" -description = "Async runtime for ntex" -categories = ["asynchronous"] -keywords = ["async", "runtime"] -edition = { workspace = true } -authors = { workspace = true } -readme = { workspace = true } -license = { workspace = true } -repository = { workspace = true } - -[package.metadata.docs.rs] -all-features = true -default-target = "x86_64-unknown-linux-gnu" -rustdoc-args = ["--cfg", "docsrs"] -targets = [ - "x86_64-pc-windows-gnu", - "x86_64-unknown-linux-gnu", - "x86_64-apple-darwin", - "aarch64-apple-ios", - "aarch64-linux-android", - "x86_64-unknown-dragonfly", - "x86_64-unknown-freebsd", - "x86_64-unknown-illumos", - "x86_64-unknown-netbsd", - "x86_64-unknown-openbsd", -] - -[dependencies] -async-task = { workspace = true } -bitflags = { workspace = true } -cfg-if = { workspace = true } -crossbeam-queue = { workspace = true } -fxhash = { workspace = true } -nohash-hasher = { workspace = true } -log = { workspace = true } -scoped-tls = { workspace = true } -socket2 = { workspace = true, features = ["all"] } - -# Windows specific dependencies -[target.'cfg(windows)'.dependencies] -aligned-array = "1.0.1" -windows-sys = { workspace = true, features = [ - "Win32_Foundation", - "Win32_Networking_WinSock", - "Win32_Security", - "Win32_Storage_FileSystem", - "Win32_System_Console", - "Win32_System_IO", - "Win32_System_Pipes", - "Win32_System_SystemServices", - "Win32_System_Threading", - "Win32_System_WindowsProgramming", -] } - -# Unix specific dependencies -[target.'cfg(unix)'.dependencies] -crossbeam-channel = { workspace = true } -crossbeam-queue = { workspace = true } -libc = { workspace = true } -polling = { workspace = true } - -# Linux specific dependencies -[target.'cfg(target_os = "linux")'.dependencies] -io-uring = { workspace = true, optional = true } -polling = { workspace = true, optional = true } - -# Other platform dependencies -[target.'cfg(all(not(target_os = "linux"), unix))'.dependencies] -polling = { workspace = true } - -[target.'cfg(windows)'.dev-dependencies] -windows-sys = { workspace = true, features = ["Win32_UI_WindowsAndMessaging"] } - -[build-dependencies] -cfg_aliases = { workspace = true } - -[features] -default = ["polling"] -polling = ["dep:polling"] diff --git a/ntex-neon/src/driver/asyncify.rs b/ntex-neon/src/driver/asyncify.rs deleted file mode 100644 index f7a782a0..00000000 --- a/ntex-neon/src/driver/asyncify.rs +++ /dev/null @@ -1,128 +0,0 @@ -use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; -use std::{fmt, time::Duration}; - -use crossbeam_channel::{bounded, Receiver, Sender, TrySendError}; - -/// An error that may be emitted when all worker threads are busy. It simply -/// returns the dispatchable value with a convenient [`fmt::Debug`] and -/// [`fmt::Display`] implementation. -#[derive(Copy, Clone, PartialEq, Eq)] -pub struct DispatchError(pub T); - -impl DispatchError { - /// Consume the error, yielding the dispatchable that failed to be sent. - pub fn into_inner(self) -> T { - self.0 - } -} - -impl fmt::Debug for DispatchError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - "DispatchError(..)".fmt(f) - } -} - -impl fmt::Display for DispatchError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - "all threads are busy".fmt(f) - } -} - -impl std::error::Error for DispatchError {} - -type BoxedDispatchable = Box; - -/// A trait for dispatching a closure. It's implemented for all `FnOnce() + Send -/// + 'static` but may also be implemented for any other types that are `Send` -/// and `'static`. -pub trait Dispatchable: Send + 'static { - /// Run the dispatchable - fn run(self: Box); -} - -impl Dispatchable for F -where - F: FnOnce() + Send + 'static, -{ - fn run(self: Box) { - (*self)() - } -} - -struct CounterGuard(Arc); - -impl Drop for CounterGuard { - fn drop(&mut self) { - self.0.fetch_sub(1, Ordering::AcqRel); - } -} - -fn worker( - receiver: Receiver, - counter: Arc, - timeout: Duration, -) -> impl FnOnce() { - move || { - counter.fetch_add(1, Ordering::AcqRel); - let _guard = CounterGuard(counter); - while let Ok(f) = receiver.recv_timeout(timeout) { - f.run(); - } - } -} - -/// A thread pool to perform blocking operations in other threads. -#[derive(Debug, Clone)] -pub struct AsyncifyPool { - sender: Sender, - receiver: Receiver, - counter: Arc, - thread_limit: usize, - recv_timeout: Duration, -} - -impl AsyncifyPool { - /// Create [`AsyncifyPool`] with thread number limit and channel receive - /// timeout. - pub fn new(thread_limit: usize, recv_timeout: Duration) -> Self { - let (sender, receiver) = bounded(0); - Self { - sender, - receiver, - counter: Arc::new(AtomicUsize::new(0)), - thread_limit, - recv_timeout, - } - } - - /// Send a dispatchable, usually a closure, to another thread. Usually the - /// user should not use it. When all threads are busy and thread number - /// limit has been reached, it will return an error with the original - /// dispatchable. - pub fn dispatch(&self, f: D) -> Result<(), DispatchError> { - match self.sender.try_send(Box::new(f) as BoxedDispatchable) { - Ok(_) => Ok(()), - Err(e) => match e { - TrySendError::Full(f) => { - if self.counter.load(Ordering::Acquire) >= self.thread_limit { - // Safety: we can ensure the type - Err(DispatchError(*unsafe { - Box::from_raw(Box::into_raw(f).cast()) - })) - } else { - std::thread::spawn(worker( - self.receiver.clone(), - self.counter.clone(), - self.recv_timeout, - )); - self.sender.send(f).expect("the channel should not be full"); - Ok(()) - } - } - TrySendError::Disconnected(_) => { - unreachable!("receiver should not all disconnected") - } - }, - } - } -} diff --git a/ntex-neon/src/driver/driver_type.rs b/ntex-neon/src/driver/driver_type.rs deleted file mode 100644 index 3738bb9d..00000000 --- a/ntex-neon/src/driver/driver_type.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::sync::atomic::{AtomicU8, Ordering}; - -const UNINIT: u8 = u8::MAX; -const IO_URING: u8 = 0; -const POLLING: u8 = 1; -const IOCP: u8 = 2; - -static DRIVER_TYPE: AtomicU8 = AtomicU8::new(UNINIT); - -/// Representing underlying driver type the fusion driver is using -#[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum DriverType { - /// Using `polling` driver - Poll = POLLING, - - /// Using `io-uring` driver - IoUring = IO_URING, - - /// Using `iocp` driver - IOCP = IOCP, -} - -impl DriverType { - fn from_num(n: u8) -> Self { - match n { - IO_URING => Self::IoUring, - POLLING => Self::Poll, - IOCP => Self::IOCP, - _ => unreachable!("invalid driver type"), - } - } - - /// Get the underlying driver type - fn get() -> DriverType { - cfg_if::cfg_if! { - if #[cfg(windows)] { - DriverType::IOCP - } else if #[cfg(all(target_os = "linux", feature = "polling", feature = "io-uring"))] { - use io_uring::opcode::*; - - // Add more opcodes here if used - const USED_OP: &[u8] = &[ - Read::CODE, - Readv::CODE, - Write::CODE, - Writev::CODE, - Fsync::CODE, - Accept::CODE, - Connect::CODE, - RecvMsg::CODE, - SendMsg::CODE, - AsyncCancel::CODE, - OpenAt::CODE, - Close::CODE, - Shutdown::CODE, - ]; - - (|| { - let uring = io_uring::IoUring::new(2)?; - let mut probe = io_uring::Probe::new(); - uring.submitter().register_probe(&mut probe)?; - if USED_OP.iter().all(|op| probe.is_supported(*op)) { - std::io::Result::Ok(DriverType::IoUring) - } else { - Ok(DriverType::Poll) - } - })() - .unwrap_or(DriverType::Poll) // Should we fail here? - } else if #[cfg(all(target_os = "linux", feature = "io-uring"))] { - DriverType::IoUring - } else if #[cfg(unix)] { - DriverType::Poll - } else { - compile_error!("unsupported platform"); - } - } - } - - /// Get the underlying driver type and cache it. Following calls will return - /// the cached value. - pub fn current() -> DriverType { - match DRIVER_TYPE.load(Ordering::Acquire) { - UNINIT => {} - x => return DriverType::from_num(x), - } - let dev_ty = Self::get(); - - DRIVER_TYPE.store(dev_ty as u8, Ordering::Release); - - dev_ty - } - - /// Check if the current driver is `polling` - pub fn is_polling() -> bool { - Self::current() == DriverType::Poll - } - - /// Check if the current driver is `io-uring` - pub fn is_iouring() -> bool { - Self::current() == DriverType::IoUring - } - - /// Check if the current driver is `iocp` - pub fn is_iocp() -> bool { - Self::current() == DriverType::IOCP - } -} diff --git a/ntex-neon/src/driver/key.rs b/ntex-neon/src/driver/key.rs deleted file mode 100644 index 97b7495e..00000000 --- a/ntex-neon/src/driver/key.rs +++ /dev/null @@ -1,215 +0,0 @@ -use std::{io, marker::PhantomData, mem::MaybeUninit, pin::Pin, task::Waker}; - -use super::{OpCode, Overlapped, PushEntry, RawFd}; - -/// An operation with other needed information. It should be allocated on the -/// heap. The pointer to this struct is used as `user_data`, and on Windows, it -/// is used as the pointer to `OVERLAPPED`. -/// -/// `*const RawOp` can be obtained from any `Key` by -/// first casting `Key::user_data` to `*const RawOp<()>`, then upcasted with -/// `upcast_fn`. It is done in [`Key::as_op_pin`]. -#[repr(C)] -pub(crate) struct RawOp { - header: Overlapped, - // The cancelled flag and the result here are manual reference counting. The driver holds the - // strong ref until it completes; the runtime holds the strong ref until the future is - // dropped. - cancelled: bool, - // The metadata in `*mut RawOp` - metadata: usize, - result: PushEntry, io::Result>, - flags: u32, - op: T, -} - -#[repr(C)] -union OpCodePtrRepr { - ptr: *mut RawOp, - components: OpCodePtrComponents, -} - -#[repr(C)] -#[derive(Clone, Copy)] -struct OpCodePtrComponents { - data_pointer: *mut (), - metadata: usize, -} - -fn opcode_metadata() -> usize { - let mut op = MaybeUninit::>::uninit(); - // SAFETY: same as `core::ptr::metadata`. - unsafe { - OpCodePtrRepr { - ptr: op.as_mut_ptr(), - } - .components - .metadata - } -} - -const unsafe fn opcode_dyn_mut(ptr: *mut (), metadata: usize) -> *mut RawOp { - OpCodePtrRepr { - components: OpCodePtrComponents { - metadata, - data_pointer: ptr, - }, - } - .ptr -} - -/// A typed wrapper for key of Ops submitted into driver. It doesn't free the -/// inner on dropping. Instead, the memory is managed by the proactor. The inner -/// is only freed when: -/// -/// 1. The op is completed and the future asks the result. `into_inner` will be -/// called by the proactor. -/// 2. The op is completed and the future cancels it. `into_box` will be called -/// by the proactor. -#[derive(PartialEq, Eq, Hash)] -pub struct Key { - user_data: *mut (), - _p: PhantomData>>, -} - -impl Unpin for Key {} - -impl Key { - /// Create [`RawOp`] and get the [`Key`] to it. - pub(crate) fn new(driver: RawFd, op: T) -> Self { - let header = Overlapped::new(driver); - let raw_op = Box::new(RawOp { - header, - cancelled: false, - metadata: opcode_metadata::(), - result: PushEntry::Pending(None), - flags: 0, - op, - }); - unsafe { Self::new_unchecked(Box::into_raw(raw_op) as _) } - } -} - -impl Key { - /// Create a new `Key` with the given user data. - /// - /// # Safety - /// - /// Caller needs to ensure that `T` does correspond to `user_data` in driver - /// this `Key` is created with. In most cases, it is enough to let `T` be - /// `dyn OpCode`. - pub unsafe fn new_unchecked(user_data: usize) -> Self { - Self { - user_data: user_data as _, - _p: PhantomData, - } - } - - /// Get the unique user-defined data. - pub fn user_data(&self) -> usize { - self.user_data as _ - } - - fn as_opaque(&self) -> &RawOp<()> { - // SAFETY: user_data is unique and RawOp is repr(C). - unsafe { &*(self.user_data as *const RawOp<()>) } - } - - fn as_opaque_mut(&mut self) -> &mut RawOp<()> { - // SAFETY: see `as_opaque`. - unsafe { &mut *(self.user_data as *mut RawOp<()>) } - } - - fn as_dyn_mut_ptr(&mut self) -> *mut RawOp { - let user_data = self.user_data; - let this = self.as_opaque_mut(); - // SAFETY: metadata from `Key::new`. - unsafe { opcode_dyn_mut(user_data, this.metadata) } - } - - /// A pointer to OVERLAPPED. - #[cfg(windows)] - pub(crate) fn as_mut_ptr(&mut self) -> *mut Overlapped { - &mut self.as_opaque_mut().header - } - - /// Cancel the op, decrease the ref count. The return value indicates if the - /// op is completed. If so, the op should be dropped because it is - /// useless. - pub(crate) fn set_cancelled(&mut self) -> bool { - self.as_opaque_mut().cancelled = true; - self.has_result() - } - - /// Complete the op, decrease the ref count. Wake the future if a waker is - /// set. The return value indicates if the op is cancelled. If so, the - /// op should be dropped because it is useless. - pub(crate) fn set_result(&mut self, res: io::Result) -> bool { - let this = unsafe { &mut *self.as_dyn_mut_ptr() }; - #[cfg(all(target_os = "linux", feature = "io-uring"))] - if let Ok(res) = res { - unsafe { - Pin::new_unchecked(&mut this.op).set_result(res); - } - } - if let PushEntry::Pending(Some(w)) = - std::mem::replace(&mut this.result, PushEntry::Ready(res)) - { - w.wake(); - } - this.cancelled - } - - /// Whether the op is completed. - pub(crate) fn has_result(&self) -> bool { - self.as_opaque().result.is_ready() - } - - /// Set waker of the current future. - pub(crate) fn set_waker(&mut self, waker: Waker) { - if let PushEntry::Pending(w) = &mut self.as_opaque_mut().result { - *w = Some(waker) - } - } - - /// Get the inner [`RawOp`]. It is usually used to drop the inner - /// immediately, without knowing about the inner `T`. - /// - /// # Safety - /// - /// Call it only when the op is cancelled and completed, which is the case - /// when the ref count becomes zero. See doc of [`Key::set_cancelled`] - /// and [`Key::set_result`]. - pub(crate) unsafe fn into_box(mut self) -> Box> { - Box::from_raw(self.as_dyn_mut_ptr()) - } -} - -impl Key { - /// Get the inner result if it is completed. - /// - /// # Safety - /// - /// Call it only when the op is completed, otherwise it is UB. - pub(crate) unsafe fn into_inner(self) -> (io::Result, T) { - let op = unsafe { Box::from_raw(self.user_data as *mut RawOp) }; - (op.result.take_ready().unwrap_unchecked(), op.op) - } -} - -impl Key { - /// Pin the inner op. - pub(crate) fn as_op_pin(&mut self) -> Pin<&mut dyn OpCode> { - // SAFETY: the inner won't be moved. - unsafe { - let this = &mut *self.as_dyn_mut_ptr(); - Pin::new_unchecked(&mut this.op) - } - } -} - -impl std::fmt::Debug for Key { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Key({})", self.user_data()) - } -} diff --git a/ntex-neon/src/driver/mod.rs b/ntex-neon/src/driver/mod.rs deleted file mode 100644 index 346a7305..00000000 --- a/ntex-neon/src/driver/mod.rs +++ /dev/null @@ -1,455 +0,0 @@ -//! The platform-specified driver. -//! Some types differ by compilation target. - -#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -#![allow(clippy::type_complexity)] - -// #[cfg(all( -// target_os = "linux", -// not(feature = "io-uring"), -// not(feature = "polling") -// ))] -// compile_error!( -// "You must choose at least one of these features: [\"io-uring\", \"polling\"]" -// ); - -#[cfg(unix)] -use std::{io, task::Poll, task::Waker, time::Duration}; - -#[cfg(unix)] -mod key; -#[cfg(unix)] -pub use key::Key; - -#[cfg(unix)] -pub mod op; -#[cfg(unix)] -#[cfg_attr(docsrs, doc(cfg(all())))] -mod unix; -#[cfg(unix)] -use unix::Overlapped; - -#[cfg(unix)] -mod asyncify; -#[cfg(unix)] -pub use asyncify::*; - -mod driver_type; -pub use driver_type::*; - -cfg_if::cfg_if! { - //if #[cfg(windows)] { - // #[path = "iocp/mod.rs"] - // mod sys; - //} else - if #[cfg(all(target_os = "linux", feature = "io-uring"))] { - #[path = "uring/mod.rs"] - mod sys; - } else if #[cfg(unix)] { - #[path = "poll/mod.rs"] - mod sys; - } -} - -#[cfg(unix)] -pub use sys::*; - -#[cfg(windows)] -#[macro_export] -#[doc(hidden)] -macro_rules! syscall { - (BOOL, $e:expr) => { - $crate::syscall!($e, == 0) - }; - (SOCKET, $e:expr) => { - $crate::syscall!($e, != 0) - }; - (HANDLE, $e:expr) => { - $crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE) - }; - ($e:expr, $op: tt $rhs: expr) => {{ - #[allow(unused_unsafe)] - let res = unsafe { $e }; - if res $op $rhs { - Err(::std::io::Error::last_os_error()) - } else { - Ok(res) - } - }}; -} - -/// Helper macro to execute a system call -#[cfg(unix)] -#[macro_export] -#[doc(hidden)] -macro_rules! syscall { - (break $e:expr) => { - loop { - match $crate::syscall!($e) { - Ok(fd) => break ::std::task::Poll::Ready(Ok(fd as usize)), - Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS) - => break ::std::task::Poll::Pending, - Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => {}, - Err(e) => break ::std::task::Poll::Ready(Err(e)), - } - } - }; - ($e:expr, $f:ident($fd:expr)) => { - match $crate::syscall!(break $e) { - ::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)), - ::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)), - ::std::task::Poll::Ready(Err(e)) => Err(e), - } - }; - ($e:expr) => {{ - #[allow(unused_unsafe)] - let res = unsafe { $e }; - if res == -1 { - Err(::std::io::Error::last_os_error()) - } else { - Ok(res) - } - }}; -} - -#[macro_export] -#[doc(hidden)] -macro_rules! impl_raw_fd { - ($t:ty, $it:ty, $inner:ident) => { - impl $crate::driver::AsRawFd for $t { - fn as_raw_fd(&self) -> $crate::driver::RawFd { - self.$inner.as_raw_fd() - } - } - #[cfg(unix)] - impl std::os::fd::FromRawFd for $t { - unsafe fn from_raw_fd(fd: $crate::driver::RawFd) -> Self { - Self { - $inner: std::os::fd::FromRawFd::from_raw_fd(fd), - } - } - } - }; - ($t:ty, $it:ty, $inner:ident,file) => { - $crate::impl_raw_fd!($t, $it, $inner); - #[cfg(windows)] - impl std::os::windows::io::FromRawHandle for $t { - unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self { - Self { - $inner: std::os::windows::io::FromRawHandle::from_raw_handle(handle), - } - } - } - #[cfg(windows)] - impl std::os::windows::io::AsRawHandle for $t { - fn as_raw_handle(&self) -> std::os::windows::io::RawHandle { - self.$inner.as_raw_handle() - } - } - }; - ($t:ty, $it:ty, $inner:ident,socket) => { - $crate::impl_raw_fd!($t, $it, $inner); - #[cfg(windows)] - impl std::os::windows::io::FromRawSocket for $t { - unsafe fn from_raw_socket(sock: std::os::windows::io::RawSocket) -> Self { - Self { - $inner: std::os::windows::io::FromRawSocket::from_raw_socket(sock), - } - } - } - #[cfg(windows)] - impl std::os::windows::io::AsRawSocket for $t { - fn as_raw_socket(&self) -> std::os::windows::io::RawSocket { - self.$inner.as_raw_socket() - } - } - }; -} - -/// The return type of [`Proactor::push`]. -pub enum PushEntry { - /// The operation is pushed to the submission queue. - Pending(K), - /// The operation is ready and returns. - Ready(R), -} - -impl PushEntry { - /// Get if the current variant is [`PushEntry::Ready`]. - pub const fn is_ready(&self) -> bool { - matches!(self, Self::Ready(_)) - } - - /// Take the ready variant if exists. - pub fn take_ready(self) -> Option { - match self { - Self::Pending(_) => None, - Self::Ready(res) => Some(res), - } - } - - /// Map the [`PushEntry::Pending`] branch. - pub fn map_pending(self, f: impl FnOnce(K) -> L) -> PushEntry { - match self { - Self::Pending(k) => PushEntry::Pending(f(k)), - Self::Ready(r) => PushEntry::Ready(r), - } - } - - /// Map the [`PushEntry::Ready`] branch. - pub fn map_ready(self, f: impl FnOnce(R) -> S) -> PushEntry { - match self { - Self::Pending(k) => PushEntry::Pending(k), - Self::Ready(r) => PushEntry::Ready(f(r)), - } - } -} - -#[cfg(unix)] -/// Low-level actions of completion-based IO. -/// It owns the operations to keep the driver safe. -pub struct Proactor { - driver: Driver, -} - -#[cfg(unix)] -impl Proactor { - /// Create [`Proactor`] with 1024 entries. - pub fn new() -> io::Result { - Self::builder().build() - } - - /// Create [`ProactorBuilder`] to config the proactor. - pub fn builder() -> ProactorBuilder { - ProactorBuilder::new() - } - - fn with_builder(builder: &ProactorBuilder) -> io::Result { - Ok(Self { - driver: Driver::new(builder)?, - }) - } - - /// Cancel an operation with the pushed user-defined data. - /// - /// The cancellation is not reliable. The underlying operation may continue, - /// but just don't return from [`Proactor::poll`]. Therefore, although an - /// operation is cancelled, you should not reuse its `user_data`. - pub fn cancel(&self, mut op: Key) -> Option<(io::Result, T)> { - if op.set_cancelled() { - // SAFETY: completed. - Some(unsafe { op.into_inner() }) - } else { - None - } - } - - /// Push an operation into the driver, and return the unique key, called - /// user-defined data, associated with it. - pub fn push( - &self, - op: T, - ) -> PushEntry, (io::Result, T)> { - let mut op = self.driver.create_op(op); - match self - .driver - .push(&mut unsafe { Key::::new_unchecked(op.user_data()) }) - { - Poll::Pending => PushEntry::Pending(op), - Poll::Ready(res) => { - op.set_result(res); - // SAFETY: just completed. - PushEntry::Ready(unsafe { op.into_inner() }) - } - } - } - - /// Poll the driver and get completed entries. - /// You need to call [`Proactor::pop`] to get the pushed - /// operations. - pub fn poll(&self, timeout: Option, f: F) -> io::Result<()> { - unsafe { self.driver.poll(timeout, f) } - } - - /// Get the pushed operations from the completion entries. - /// - /// # Panics - /// This function will panic if the requested operation has not been - /// completed. - pub fn pop(&self, op: Key) -> PushEntry, (io::Result, T)> { - if op.has_result() { - // SAFETY: completed. - PushEntry::Ready(unsafe { op.into_inner() }) - } else { - PushEntry::Pending(op) - } - } - - /// Update the waker of the specified op. - pub fn update_waker(&self, op: &mut Key, waker: Waker) { - op.set_waker(waker); - } - - /// Create a notify handle to interrupt the inner driver. - pub fn handle(&self) -> NotifyHandle { - self.driver.handle() - } - - pub fn register_handler(&self, f: F) - where - F: FnOnce(DriverApi) -> Box, - { - self.driver.register_handler(f) - } -} - -#[cfg(unix)] -impl AsRawFd for Proactor { - fn as_raw_fd(&self) -> RawFd { - self.driver.as_raw_fd() - } -} - -/// An completed entry returned from kernel. -#[cfg(unix)] -#[derive(Debug)] -pub(crate) struct Entry { - user_data: usize, - result: io::Result, -} - -#[cfg(unix)] -impl Entry { - pub(crate) fn new(user_data: usize, result: io::Result) -> Self { - Self { user_data, result } - } - - /// The user-defined data returned by [`Proactor::push`]. - pub fn user_data(&self) -> usize { - self.user_data - } - - /// The result of the operation. - pub fn into_result(self) -> io::Result { - self.result - } - - /// SAFETY: `user_data` should be a valid pointer. - pub unsafe fn notify(self) { - let user_data = self.user_data(); - let mut op = Key::<()>::new_unchecked(user_data); - if op.set_result(self.into_result()) { - // SAFETY: completed and cancelled. - let _ = op.into_box(); - } - } -} - -#[cfg(unix)] -#[derive(Debug, Clone)] -enum ThreadPoolBuilder { - Create { limit: usize, recv_limit: Duration }, - Reuse(AsyncifyPool), -} - -#[cfg(unix)] -impl Default for ThreadPoolBuilder { - fn default() -> Self { - Self::new() - } -} - -#[cfg(unix)] -impl ThreadPoolBuilder { - pub fn new() -> Self { - Self::Create { - limit: 256, - recv_limit: Duration::from_secs(60), - } - } - - pub fn create_or_reuse(&self) -> AsyncifyPool { - match self { - Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit), - Self::Reuse(pool) => pool.clone(), - } - } -} - -/// Builder for [`Proactor`]. -#[cfg(unix)] -#[derive(Debug, Clone)] -pub struct ProactorBuilder { - capacity: u32, - pool_builder: ThreadPoolBuilder, -} - -#[cfg(unix)] -impl Default for ProactorBuilder { - fn default() -> Self { - Self::new() - } -} - -#[cfg(unix)] -impl ProactorBuilder { - /// Create the builder with default config. - pub fn new() -> Self { - Self { - capacity: 1024, - pool_builder: ThreadPoolBuilder::new(), - } - } - - /// Set the capacity of the inner event queue or submission queue, if - /// exists. The default value is 1024. - pub fn capacity(&mut self, capacity: u32) -> &mut Self { - self.capacity = capacity; - self - } - - /// Set the thread number limit of the inner thread pool, if exists. The - /// default value is 256. - /// - /// It will be ignored if `reuse_thread_pool` is set. - pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self { - if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder { - *limit = value; - } - self - } - - /// Set the waiting timeout of the inner thread, if exists. The default is - /// 60 seconds. - /// - /// It will be ignored if `reuse_thread_pool` is set. - pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self { - if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder { - *recv_limit = timeout; - } - self - } - - /// Set to reuse an existing [`AsyncifyPool`] in this proactor. - pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self { - self.pool_builder = ThreadPoolBuilder::Reuse(pool); - self - } - - /// Force reuse the thread pool for each proactor created by this builder, - /// even `reuse_thread_pool` is not set. - pub fn force_reuse_thread_pool(&mut self) -> &mut Self { - self.reuse_thread_pool(self.create_or_get_thread_pool()); - self - } - - /// Create or reuse the thread pool from the config. - pub fn create_or_get_thread_pool(&self) -> AsyncifyPool { - self.pool_builder.create_or_reuse() - } - - /// Build the [`Proactor`]. - pub fn build(&self) -> io::Result { - Proactor::with_builder(self) - } -} diff --git a/ntex-neon/src/driver/op.rs b/ntex-neon/src/driver/op.rs deleted file mode 100644 index 65769550..00000000 --- a/ntex-neon/src/driver/op.rs +++ /dev/null @@ -1,45 +0,0 @@ -//! The async operations. -//! -//! Types in this mod represents the low-level operations passed to kernel. -//! The operation itself doesn't perform anything. -//! You need to pass them to [`crate::Proactor`], and poll the driver. - -use std::{marker::PhantomPinned, net::Shutdown}; - -#[cfg(unix)] -pub use super::sys::op::*; - -/// Spawn a blocking function in the thread pool. -pub struct Asyncify { - pub(crate) f: Option, - pub(crate) data: Option, - _p: PhantomPinned, -} - -impl Asyncify { - /// Create [`Asyncify`]. - pub fn new(f: F) -> Self { - Self { - f: Some(f), - data: None, - _p: PhantomPinned, - } - } - - pub fn into_inner(mut self) -> D { - self.data.take().expect("the data should not be None") - } -} - -/// Shutdown a socket. -pub struct ShutdownSocket { - pub(crate) fd: S, - pub(crate) how: Shutdown, -} - -impl ShutdownSocket { - /// Create [`ShutdownSocket`]. - pub fn new(fd: S, how: Shutdown) -> Self { - Self { fd, how } - } -} diff --git a/ntex-neon/src/driver/poll/mod.rs b/ntex-neon/src/driver/poll/mod.rs deleted file mode 100644 index 39162a14..00000000 --- a/ntex-neon/src/driver/poll/mod.rs +++ /dev/null @@ -1,456 +0,0 @@ -#![allow(clippy::type_complexity)] -pub use std::os::fd::{AsRawFd, OwnedFd, RawFd}; - -use std::{cell::Cell, cell::RefCell, io, rc::Rc, sync::Arc}; -use std::{num::NonZeroUsize, os::fd::BorrowedFd, pin::Pin, task::Poll, time::Duration}; - -use crossbeam_queue::SegQueue; -use nohash_hasher::IntMap; -use polling::{Event, Events, Poller}; - -use crate::driver::{op::Interest, sys, AsyncifyPool, Entry, Key, ProactorBuilder}; - -pub(crate) mod op; - -/// Abstraction of operations. -pub trait OpCode { - /// Perform the operation before submit, and return [`Decision`] to - /// indicate whether submitting the operation to polling is required. - fn pre_submit(self: Pin<&mut Self>) -> io::Result; - - /// Perform the operation after received corresponding - /// event. If this operation is blocking, the return value should be - /// [`Poll::Ready`]. - fn operate(self: Pin<&mut Self>) -> Poll>; -} - -/// Result of [`OpCode::pre_submit`]. -#[non_exhaustive] -pub enum Decision { - /// Instant operation, no need to submit - Completed(usize), - /// Blocking operation, needs to be spawned in another thread - Blocking, -} - -bitflags::bitflags! { - #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] - struct Flags: u8 { - const NEW = 0b0000_0001; - const CHANGED = 0b0000_0010; - } -} - -#[derive(Debug)] -struct FdItem { - flags: Flags, - batch: usize, - read: Option, - write: Option, -} - -impl FdItem { - fn new(batch: usize) -> Self { - Self { - batch, - read: None, - write: None, - flags: Flags::NEW, - } - } - - fn register(&mut self, user_data: usize, interest: Interest) { - self.flags.insert(Flags::CHANGED); - match interest { - Interest::Readable => { - self.read = Some(user_data); - } - Interest::Writable => { - self.write = Some(user_data); - } - } - } - - fn unregister(&mut self, int: Interest) { - let res = match int { - Interest::Readable => self.read.take(), - Interest::Writable => self.write.take(), - }; - if res.is_some() { - self.flags.insert(Flags::CHANGED); - } - } - - fn unregister_all(&mut self) { - if self.read.is_some() || self.write.is_some() { - self.flags.insert(Flags::CHANGED); - } - - let _ = self.read.take(); - let _ = self.write.take(); - } - - fn user_data(&mut self, interest: Interest) -> Option { - match interest { - Interest::Readable => self.read, - Interest::Writable => self.write, - } - } - - fn event(&self, key: usize) -> Event { - let mut event = Event::none(key); - if self.read.is_some() { - event.readable = true; - } - if self.write.is_some() { - event.writable = true; - } - event - } -} - -#[derive(Debug)] -enum Change { - Register { - fd: RawFd, - batch: usize, - user_data: usize, - int: Interest, - }, - Unregister { - fd: RawFd, - batch: usize, - int: Interest, - }, - UnregisterAll { - fd: RawFd, - batch: usize, - }, - Blocking { - user_data: usize, - }, -} - -pub struct DriverApi { - batch: usize, - changes: Rc>>, -} - -impl DriverApi { - pub fn register(&self, fd: RawFd, user_data: usize, int: Interest) { - log::debug!( - "Register interest {:?} for {:?} user-data: {:?}", - int, - fd, - user_data - ); - self.change(Change::Register { - fd, - batch: self.batch, - user_data, - int, - }); - } - - pub fn unregister(&self, fd: RawFd, int: Interest) { - log::debug!( - "Unregister interest {:?} for {:?} batch: {:?}", - int, - fd, - self.batch - ); - self.change(Change::Unregister { - fd, - batch: self.batch, - int, - }); - } - - pub fn unregister_all(&self, fd: RawFd) { - self.change(Change::UnregisterAll { - fd, - batch: self.batch, - }); - } - - fn change(&self, change: Change) { - self.changes.borrow_mut().push(change); - } -} - -/// Low-level driver of polling. -pub(crate) struct Driver { - poll: Arc, - events: RefCell, - registry: RefCell>, - pool: AsyncifyPool, - pool_completed: Arc>, - - hid: Cell, - changes: Rc>>, - handlers: Cell>>>>, -} - -impl Driver { - pub fn new(builder: &ProactorBuilder) -> io::Result { - log::trace!("New poll driver"); - let entries = builder.capacity as usize; // for the sake of consistency, use u32 like iour - let events = if entries == 0 { - Events::new() - } else { - Events::with_capacity(NonZeroUsize::new(entries).unwrap()) - }; - - Ok(Self { - poll: Arc::new(Poller::new()?), - events: RefCell::new(events), - registry: RefCell::new(Default::default()), - pool: builder.create_or_get_thread_pool(), - pool_completed: Arc::new(SegQueue::new()), - hid: Cell::new(0), - changes: Rc::new(RefCell::new(Vec::with_capacity(16))), - handlers: Cell::new(Some(Box::new(Vec::default()))), - }) - } - - pub fn register_handler(&self, f: F) - where - F: FnOnce(DriverApi) -> Box, - { - let id = self.hid.get(); - let mut handlers = self.handlers.take().unwrap_or_default(); - - let api = DriverApi { - batch: id, - changes: self.changes.clone(), - }; - handlers.push(f(api)); - self.hid.set(id + 1); - self.handlers.set(Some(handlers)); - } - - pub fn create_op(&self, op: T) -> Key { - Key::new(self.as_raw_fd(), op) - } - - pub fn push(&self, op: &mut Key) -> Poll> { - let user_data = op.user_data(); - let op_pin = op.as_op_pin(); - match op_pin.pre_submit()? { - Decision::Completed(res) => Poll::Ready(Ok(res)), - Decision::Blocking => { - self.changes - .borrow_mut() - .push(Change::Blocking { user_data }); - Poll::Pending - } - } - } - - pub unsafe fn poll( - &self, - timeout: Option, - f: F, - ) -> io::Result<()> { - if self.poll_blocking() { - f(); - self.apply_changes()?; - return Ok(()); - } - - let mut events = self.events.borrow_mut(); - self.poll.wait(&mut events, timeout)?; - - if events.is_empty() { - if timeout.is_some() && timeout != Some(Duration::ZERO) { - return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)); - } - } else { - // println!("POLL, events: {:?}", events.len()); - let mut registry = self.registry.borrow_mut(); - let mut handlers = self.handlers.take().unwrap(); - for event in events.iter() { - let fd = event.key as RawFd; - log::debug!("Event {:?} for {:?}", event, registry.get(&fd)); - - if let Some(item) = registry.get_mut(&fd) { - if event.readable { - if let Some(user_data) = item.user_data(Interest::Readable) { - handlers[item.batch].readable(user_data) - } - } - if event.writable { - if let Some(user_data) = item.user_data(Interest::Writable) { - handlers[item.batch].writable(user_data) - } - } - } - } - self.handlers.set(Some(handlers)); - } - - // apply changes - self.apply_changes()?; - - // complete batch handling - let mut handlers = self.handlers.take().unwrap(); - for handler in handlers.iter_mut() { - handler.commit(); - } - self.handlers.set(Some(handlers)); - self.apply_changes()?; - - // run user function - f(); - - // check if we have more changes from "run" - self.apply_changes()?; - - Ok(()) - } - - /// re-calc driver changes - unsafe fn apply_changes(&self) -> io::Result<()> { - let mut changes = self.changes.borrow_mut(); - if changes.is_empty() { - return Ok(()); - } - log::debug!("Apply driver changes, {:?}", changes.len()); - - let mut registry = self.registry.borrow_mut(); - - for change in &mut *changes { - match change { - Change::Register { - fd, - batch, - user_data, - int, - } => { - let item = registry.entry(*fd).or_insert_with(|| FdItem::new(*batch)); - item.register(*user_data, *int); - } - Change::Unregister { fd, batch, int } => { - let item = registry.entry(*fd).or_insert_with(|| FdItem::new(*batch)); - item.unregister(*int); - } - Change::UnregisterAll { fd, batch } => { - let item = registry.entry(*fd).or_insert_with(|| FdItem::new(*batch)); - item.unregister_all(); - } - _ => {} - } - } - - for change in changes.drain(..) { - let fd = match change { - Change::Register { fd, .. } => Some(fd), - Change::Unregister { fd, .. } => Some(fd), - Change::UnregisterAll { fd, .. } => Some(fd), - Change::Blocking { user_data } => { - self.push_blocking(user_data); - None - } - }; - - if let Some(fd) = fd { - if let Some(item) = registry.get_mut(&fd) { - if item.flags.contains(Flags::CHANGED) { - item.flags.remove(Flags::CHANGED); - - let new = item.flags.contains(Flags::NEW); - let renew_event = item.event(fd as usize); - - if !renew_event.readable && !renew_event.writable { - // crate::log(format!("DELETE - {:?}", fd.as_raw_fd())); - registry.remove(&fd); - if !new { - self.poll.delete(BorrowedFd::borrow_raw(fd))?; - } - } else if new { - item.flags.remove(Flags::NEW); - // crate::log(format!("ADD - {:?}", fd.as_raw_fd())); - unsafe { self.poll.add(fd, renew_event)? }; - } else { - // crate::log(format!("MODIFY - {:?} {:?}", fd.as_raw_fd(), renew_event)); - self.poll.modify(BorrowedFd::borrow_raw(fd), renew_event)?; - } - } - } - } - } - - Ok(()) - } - - fn push_blocking(&self, user_data: usize) { - let poll = self.poll.clone(); - let completed = self.pool_completed.clone(); - let mut closure = move || { - let mut op = unsafe { Key::::new_unchecked(user_data) }; - let op_pin = op.as_op_pin(); - let res = match op_pin.operate() { - Poll::Pending => unreachable!("this operation is not non-blocking"), - Poll::Ready(res) => res, - }; - completed.push(Entry::new(user_data, res)); - poll.notify().ok(); - }; - while let Err(e) = self.pool.dispatch(closure) { - closure = e.0; - self.poll_blocking(); - } - } - - fn poll_blocking(&self) -> bool { - if self.pool_completed.is_empty() { - false - } else { - while let Some(entry) = self.pool_completed.pop() { - unsafe { - entry.notify(); - } - } - true - } - } - - /// Get notification handle - pub fn handle(&self) -> NotifyHandle { - NotifyHandle::new(self.poll.clone()) - } -} - -impl AsRawFd for Driver { - fn as_raw_fd(&self) -> RawFd { - self.poll.as_raw_fd() - } -} - -impl Drop for Driver { - fn drop(&mut self) { - for fd in self.registry.borrow().keys() { - unsafe { - let fd = BorrowedFd::borrow_raw(*fd); - self.poll.delete(fd).ok(); - } - } - } -} - -#[derive(Clone)] -/// A notify handle to the inner driver. -pub struct NotifyHandle { - poll: Arc, -} - -impl NotifyHandle { - fn new(poll: Arc) -> Self { - Self { poll } - } - - /// Notify the driver - pub fn notify(&self) -> io::Result<()> { - self.poll.notify() - } -} diff --git a/ntex-neon/src/driver/poll/op.rs b/ntex-neon/src/driver/poll/op.rs deleted file mode 100644 index 7b5ad728..00000000 --- a/ntex-neon/src/driver/poll/op.rs +++ /dev/null @@ -1,96 +0,0 @@ -use std::{io, marker::Send, mem, os::fd::FromRawFd, os::fd::RawFd, pin::Pin, task::Poll}; - -pub use crate::driver::unix::op::*; - -use super::{AsRawFd, Decision, OpCode, OwnedFd}; -use crate::{driver::op::*, syscall}; - -pub trait Handler { - /// Submitted interest - fn readable(&mut self, id: usize); - - /// Submitted interest - fn writable(&mut self, id: usize); - - /// Operation submission has failed - fn error(&mut self, id: usize, err: io::Error); - - /// All events are processed, process all updates - fn commit(&mut self); -} - -impl OpCode for Asyncify -where - D: Send + 'static, - F: (FnOnce() -> (io::Result, D)) + Send + 'static, -{ - fn pre_submit(self: Pin<&mut Self>) -> io::Result { - Ok(Decision::Blocking) - } - - fn operate(self: Pin<&mut Self>) -> Poll> { - // Safety: self won't be moved - let this = unsafe { self.get_unchecked_mut() }; - let f = this - .f - .take() - .expect("the operate method could only be called once"); - let (res, data) = f(); - this.data = Some(data); - Poll::Ready(res) - } -} - -/// Close socket fd. -pub struct CloseSocket { - pub(crate) fd: mem::ManuallyDrop, -} - -impl CloseSocket { - /// Create [`CloseSocket`]. - pub fn new(fd: OwnedFd) -> Self { - Self { - fd: mem::ManuallyDrop::new(fd), - } - } -} - -impl OpCode for CreateSocket { - fn pre_submit(self: Pin<&mut Self>) -> io::Result { - Ok(Decision::Blocking) - } - - fn operate(self: Pin<&mut Self>) -> Poll> { - Poll::Ready(Ok( - syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _, - )) - } -} - -impl OpCode for ShutdownSocket { - fn pre_submit(self: Pin<&mut Self>) -> io::Result { - Ok(Decision::Blocking) - } - - fn operate(self: Pin<&mut Self>) -> Poll> { - Poll::Ready(Ok( - syscall!(libc::shutdown(self.fd.as_raw_fd(), self.how()))? as _, - )) - } -} - -impl CloseSocket { - pub fn from_raw_fd(fd: RawFd) -> Self { - Self::new(unsafe { FromRawFd::from_raw_fd(fd) }) - } -} - -impl OpCode for CloseSocket { - fn pre_submit(self: Pin<&mut Self>) -> io::Result { - Ok(Decision::Blocking) - } - - fn operate(self: Pin<&mut Self>) -> Poll> { - Poll::Ready(Ok(syscall!(libc::close(self.fd.as_raw_fd()))? as _)) - } -} diff --git a/ntex-neon/src/driver/unix/mod.rs b/ntex-neon/src/driver/unix/mod.rs deleted file mode 100644 index 1603d949..00000000 --- a/ntex-neon/src/driver/unix/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! This mod doesn't actually contain any driver, but meant to provide some -//! common op type and utilities for unix platform (for iour and polling). - -pub(crate) mod op; - -use crate::driver::RawFd; - -/// The overlapped struct for unix needn't contain extra fields. -pub(crate) struct Overlapped; - -impl Overlapped { - pub fn new(_driver: RawFd) -> Self { - Self - } -} diff --git a/ntex-neon/src/driver/unix/op.rs b/ntex-neon/src/driver/unix/op.rs deleted file mode 100644 index 4775e4c9..00000000 --- a/ntex-neon/src/driver/unix/op.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::net::Shutdown; - -use crate::driver::op::*; - -/// The interest to poll a file descriptor. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Interest { - /// Represents a read operation. - Readable, - /// Represents a write operation. - Writable, -} - -/// Create a socket. -pub struct CreateSocket { - pub(crate) domain: i32, - pub(crate) socket_type: i32, - pub(crate) protocol: i32, -} - -impl CreateSocket { - /// Create [`CreateSocket`]. - pub fn new(domain: i32, socket_type: i32, protocol: i32) -> Self { - Self { - domain, - socket_type, - protocol, - } - } -} - -impl ShutdownSocket { - pub(crate) fn how(&self) -> i32 { - match self.how { - Shutdown::Write => libc::SHUT_WR, - Shutdown::Read => libc::SHUT_RD, - Shutdown::Both => libc::SHUT_RDWR, - } - } -} diff --git a/ntex-neon/src/driver/uring/mod.rs b/ntex-neon/src/driver/uring/mod.rs deleted file mode 100644 index cd3c8354..00000000 --- a/ntex-neon/src/driver/uring/mod.rs +++ /dev/null @@ -1,340 +0,0 @@ -pub use std::os::fd::{AsRawFd, OwnedFd, RawFd}; - -use std::cell::{Cell, RefCell}; -use std::{ - collections::VecDeque, io, pin::Pin, rc::Rc, sync::Arc, task::Poll, time::Duration, -}; - -use crossbeam_queue::SegQueue; -use io_uring::cqueue::{more, Entry as CEntry}; -use io_uring::opcode::{AsyncCancel, PollAdd}; -use io_uring::squeue::Entry as SEntry; -use io_uring::types::{Fd, SubmitArgs, Timespec}; -use io_uring::IoUring; - -mod notify; -pub(crate) mod op; - -pub use self::notify::NotifyHandle; - -use self::notify::Notifier; -use crate::driver::{sys, AsyncifyPool, Entry, Key, ProactorBuilder}; - -/// Abstraction of io-uring operations. -pub trait OpCode { - /// Name of the operation - fn name(&self) -> &'static str; - - /// Call the operation in a blocking way. This method will only be called if - /// [`create_entry`] returns [`OpEntry::Blocking`]. - fn call_blocking(self: Pin<&mut Self>) -> io::Result { - unreachable!("this operation is asynchronous") - } - - /// Set the result when it successfully completes. - /// The operation stores the result and is responsible to release it if the - /// operation is cancelled. - /// - /// # Safety - /// - /// Users should not call it. - unsafe fn set_result(self: Pin<&mut Self>, _: usize) {} -} - -#[derive(Debug)] -enum Change { - Submit { entry: SEntry }, - Cancel { op_id: u64 }, -} - -pub struct DriverApi { - batch: u64, - changes: Rc>>, -} - -impl DriverApi { - pub fn submit(&self, user_data: u32, entry: SEntry) { - log::debug!( - "Submit operation batch: {:?} user-data: {:?} entry: {:?}", - self.batch >> Driver::BATCH, - user_data, - entry, - ); - self.changes.borrow_mut().push_back(Change::Submit { - entry: entry.user_data(user_data as u64 | self.batch), - }); - } - - pub fn cancel(&self, op_id: u32) { - log::debug!( - "Cancel operation batch: {:?} user-data: {:?}", - self.batch >> Driver::BATCH, - op_id - ); - self.changes.borrow_mut().push_back(Change::Cancel { - op_id: op_id as u64 | self.batch, - }); - } -} - -/// Low-level driver of io-uring. -pub(crate) struct Driver { - ring: RefCell>, - notifier: Notifier, - pool: AsyncifyPool, - pool_completed: Arc>, - - hid: Cell, - changes: Rc>>, - handlers: Cell>>>>, -} - -impl Driver { - const NOTIFY: u64 = u64::MAX; - const CANCEL: u64 = u64::MAX - 1; - const BATCH: u64 = 48; - const BATCH_MASK: u64 = 0xFFFF_0000_0000_0000; - const DATA_MASK: u64 = 0x0000_FFFF_FFFF_FFFF; - - pub fn new(builder: &ProactorBuilder) -> io::Result { - log::trace!("New io-uring driver"); - - let mut ring = IoUring::builder() - .setup_coop_taskrun() - .setup_single_issuer() - .build(builder.capacity)?; - - let notifier = Notifier::new()?; - - #[allow(clippy::useless_conversion)] - unsafe { - ring.submission() - .push( - &PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _) - .multi(true) - .build() - .user_data(Self::NOTIFY) - .into(), - ) - .expect("the squeue sould not be full"); - } - Ok(Self { - notifier, - ring: RefCell::new(ring), - pool: builder.create_or_get_thread_pool(), - pool_completed: Arc::new(SegQueue::new()), - - hid: Cell::new(0), - changes: Rc::new(RefCell::new(VecDeque::new())), - handlers: Cell::new(Some(Box::new(Vec::new()))), - }) - } - - pub fn register_handler(&self, f: F) - where - F: FnOnce(DriverApi) -> Box, - { - let id = self.hid.get(); - let mut handlers = self.handlers.take().unwrap_or_default(); - - let api = DriverApi { - batch: id << 48, - changes: self.changes.clone(), - }; - handlers.push(f(api)); - self.hid.set(id + 1); - self.handlers.set(Some(handlers)); - } - - // Auto means that it choose to wait or not automatically. - fn submit_auto(&self, timeout: Option) -> io::Result<()> { - let mut ring = self.ring.borrow_mut(); - let res = { - // Last part of submission queue, wait till timeout. - if let Some(duration) = timeout { - let timespec = timespec(duration); - let args = SubmitArgs::new().timespec(×pec); - ring.submitter().submit_with_args(1, &args) - } else { - ring.submit_and_wait(1) - } - }; - match res { - Ok(_) => { - // log::debug!("Submit result: {res:?} {:?}", timeout); - if ring.completion().is_empty() { - Err(io::ErrorKind::TimedOut.into()) - } else { - Ok(()) - } - } - Err(e) => match e.raw_os_error() { - Some(libc::ETIME) => { - if timeout.is_some() && timeout != Some(Duration::ZERO) { - Err(io::ErrorKind::TimedOut.into()) - } else { - Ok(()) - } - } - Some(libc::EBUSY) | Some(libc::EAGAIN) => { - Err(io::ErrorKind::Interrupted.into()) - } - _ => Err(e), - }, - } - } - - pub fn create_op(&self, op: T) -> Key { - Key::new(self.as_raw_fd(), op) - } - - fn apply_changes(&self) -> bool { - let mut changes = self.changes.borrow_mut(); - if changes.is_empty() { - return false; - } - log::debug!("Apply changes, {:?}", changes.len()); - - let mut ring = self.ring.borrow_mut(); - let mut squeue = ring.submission(); - - while let Some(change) = changes.pop_front() { - match change { - Change::Submit { entry } => { - if unsafe { squeue.push(&entry) }.is_err() { - changes.push_front(Change::Submit { entry }); - break; - } - } - Change::Cancel { op_id } => { - let entry = AsyncCancel::new(op_id).build().user_data(Self::CANCEL); - if unsafe { squeue.push(&entry) }.is_err() { - changes.push_front(Change::Cancel { op_id }); - break; - } - } - } - } - squeue.sync(); - - !changes.is_empty() - } - - pub unsafe fn poll( - &self, - timeout: Option, - f: F, - ) -> io::Result<()> { - self.poll_blocking(); - - let has_more = self.apply_changes(); - let poll_result = self.poll_completions(); - - if !poll_result || has_more { - if has_more { - self.submit_auto(Some(Duration::ZERO))?; - } else { - self.submit_auto(timeout)?; - } - self.poll_completions(); - } - - f(); - - Ok(()) - } - - pub fn push(&self, op: &mut Key) -> Poll> { - log::trace!("Push op: {:?}", op.as_op_pin().name()); - - let user_data = op.user_data(); - loop { - if self.push_blocking(user_data) { - break Poll::Pending; - } else { - self.poll_blocking(); - } - } - } - - fn poll_completions(&self) -> bool { - let mut ring = self.ring.borrow_mut(); - let mut cqueue = ring.completion(); - cqueue.sync(); - let has_entry = !cqueue.is_empty(); - if !has_entry { - return false; - } - let mut handlers = self.handlers.take().unwrap(); - for entry in cqueue { - let user_data = entry.user_data(); - match user_data { - Self::CANCEL => {} - Self::NOTIFY => { - let flags = entry.flags(); - debug_assert!(more(flags)); - self.notifier.clear().expect("cannot clear notifier"); - } - _ => { - let batch = ((user_data & Self::BATCH_MASK) >> Self::BATCH) as usize; - let user_data = (user_data & Self::DATA_MASK) as usize; - - let result = entry.result(); - - if result == -libc::ECANCELED { - handlers[batch].canceled(user_data); - } else { - let result = if result < 0 { - Err(io::Error::from_raw_os_error(result)) - } else { - Ok(result as _) - }; - handlers[batch].completed(user_data, entry.flags(), result); - } - } - } - } - self.handlers.set(Some(handlers)); - true - } - - fn poll_blocking(&self) { - if !self.pool_completed.is_empty() { - while let Some(entry) = self.pool_completed.pop() { - unsafe { - entry.notify(); - } - } - } - } - - fn push_blocking(&self, user_data: usize) -> bool { - let handle = self.handle(); - let completed = self.pool_completed.clone(); - self.pool - .dispatch(move || { - let mut op = unsafe { Key::::new_unchecked(user_data) }; - let op_pin = op.as_op_pin(); - let res = op_pin.call_blocking(); - completed.push(Entry::new(user_data, res)); - handle.notify().ok(); - }) - .is_ok() - } - - pub fn handle(&self) -> NotifyHandle { - self.notifier.handle() - } -} - -impl AsRawFd for Driver { - fn as_raw_fd(&self) -> RawFd { - self.ring.borrow().as_raw_fd() - } -} - -fn timespec(duration: std::time::Duration) -> Timespec { - Timespec::new() - .sec(duration.as_secs()) - .nsec(duration.subsec_nanos()) -} diff --git a/ntex-neon/src/driver/uring/notify.rs b/ntex-neon/src/driver/uring/notify.rs deleted file mode 100644 index faa9cae0..00000000 --- a/ntex-neon/src/driver/uring/notify.rs +++ /dev/null @@ -1,73 +0,0 @@ -use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}; -use std::{io, mem, sync::Arc}; - -use crate::syscall; - -#[derive(Debug)] -pub(crate) struct Notifier { - fd: Arc, -} - -impl Notifier { - /// Create a new notifier. - pub(crate) fn new() -> io::Result { - let fd = syscall!(libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?; - let fd = unsafe { OwnedFd::from_raw_fd(fd) }; - Ok(Self { fd: Arc::new(fd) }) - } - - pub(crate) fn clear(&self) -> io::Result<()> { - loop { - let mut buffer = [0u64]; - let res = syscall!(libc::read( - self.fd.as_raw_fd(), - buffer.as_mut_ptr().cast(), - mem::size_of::() - )); - match res { - Ok(len) => { - debug_assert_eq!(len, mem::size_of::() as _); - break Ok(()); - } - // Clear the next time:) - Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(()), - // Just like read_exact - Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, - Err(e) => break Err(e), - } - } - } - - pub(crate) fn handle(&self) -> NotifyHandle { - NotifyHandle::new(self.fd.clone()) - } -} - -impl AsRawFd for Notifier { - fn as_raw_fd(&self) -> RawFd { - self.fd.as_raw_fd() - } -} - -#[derive(Clone)] -/// A notify handle to the inner driver. -pub struct NotifyHandle { - fd: Arc, -} - -impl NotifyHandle { - pub(crate) fn new(fd: Arc) -> Self { - Self { fd } - } - - /// Notify the inner driver. - pub fn notify(&self) -> io::Result<()> { - let data = 1u64; - syscall!(libc::write( - self.fd.as_raw_fd(), - &data as *const _ as *const _, - std::mem::size_of::(), - ))?; - Ok(()) - } -} diff --git a/ntex-neon/src/driver/uring/op.rs b/ntex-neon/src/driver/uring/op.rs deleted file mode 100644 index e4256af4..00000000 --- a/ntex-neon/src/driver/uring/op.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::{io, os::fd::AsRawFd, pin::Pin}; - -pub use crate::driver::unix::op::*; - -use super::OpCode; -use crate::{driver::op::*, syscall}; - -pub trait Handler { - /// Operation is completed - fn completed(&mut self, user_data: usize, flags: u32, result: io::Result); - - fn canceled(&mut self, user_data: usize); -} - -impl OpCode for Asyncify -where - D: Send + 'static, - F: (FnOnce() -> (io::Result, D)) + Send + 'static, -{ - fn name(&self) -> &'static str { - "Asyncify" - } - - fn call_blocking(self: Pin<&mut Self>) -> std::io::Result { - // Safety: self won't be moved - let this = unsafe { self.get_unchecked_mut() }; - let f = this - .f - .take() - .expect("the operate method could only be called once"); - let (res, data) = f(); - this.data = Some(data); - res - } -} - -impl OpCode for CreateSocket { - fn name(&self) -> &'static str { - "CreateSocket" - } - - fn call_blocking(self: Pin<&mut Self>) -> io::Result { - Ok(syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _) - } -} - -impl OpCode for ShutdownSocket { - fn name(&self) -> &'static str { - "ShutdownSocket" - } - - fn call_blocking(self: Pin<&mut Self>) -> io::Result { - Ok(syscall!(libc::shutdown(self.fd.as_raw_fd(), self.how()))? as _) - } -} diff --git a/ntex-neon/src/lib.rs b/ntex-neon/src/lib.rs deleted file mode 100644 index 3c85d7fc..00000000 --- a/ntex-neon/src/lib.rs +++ /dev/null @@ -1,14 +0,0 @@ -//! The async runtime for ntex. - -#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -// #![warn(missing_docs)] - -pub mod driver; -pub mod net; -mod op; -mod rt; - -pub use async_task::Task; -pub use rt::{ - spawn, spawn_blocking, submit, submit_with_flags, JoinHandle, Runtime, RuntimeBuilder, -}; diff --git a/ntex-neon/src/net/mod.rs b/ntex-neon/src/net/mod.rs deleted file mode 100644 index fc0d3179..00000000 --- a/ntex-neon/src/net/mod.rs +++ /dev/null @@ -1,11 +0,0 @@ -//! Network related. -//! -//! Currently, TCP/UDP/Unix socket are implemented. - -mod socket; -mod tcp; -mod unix; - -pub use socket::*; -pub use tcp::*; -pub use unix::*; diff --git a/ntex-neon/src/net/socket.rs b/ntex-neon/src/net/socket.rs deleted file mode 100644 index 09be0200..00000000 --- a/ntex-neon/src/net/socket.rs +++ /dev/null @@ -1,207 +0,0 @@ -#![allow(clippy::missing_safety_doc)] -use std::{io, mem::MaybeUninit}; - -use socket2::{Domain, Protocol, SockAddr, Socket as Socket2, Type}; - -use crate::{driver::AsRawFd, impl_raw_fd, syscall}; - -#[derive(Debug)] -pub struct Socket { - socket: Socket2, -} - -impl Socket { - pub fn from_socket2(socket: Socket2) -> io::Result { - #[cfg(unix)] - { - #[cfg(not(any( - target_os = "android", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "fuchsia", - target_os = "hurd", - target_os = "illumos", - target_os = "linux", - target_os = "netbsd", - target_os = "openbsd", - target_os = "espidf", - target_os = "vita", - )))] - socket.set_cloexec(true)?; - #[cfg(any( - target_os = "ios", - target_os = "macos", - target_os = "tvos", - target_os = "watchos", - ))] - socket.set_nosigpipe(true)?; - - // On Linux we use blocking socket - // Newer kernels have the patch that allows to arm io_uring poll mechanism for - // non blocking socket when there is no connections in listen queue - // - // https://patchwork.kernel.org/project/linux-block/patch/f999615b-205c-49b7-b272-c4e42e45e09d@kernel.dk/#22949861 - if cfg!(not(all(target_os = "linux", feature = "io-uring"))) - || crate::driver::DriverType::is_polling() - { - socket.set_nonblocking(true)?; - } - } - - Ok(Self { socket }) - } - - pub fn peer_addr(&self) -> io::Result { - self.socket.peer_addr() - } - - pub fn local_addr(&self) -> io::Result { - self.socket.local_addr() - } - - #[cfg(windows)] - pub async fn new( - domain: Domain, - ty: Type, - protocol: Option, - ) -> io::Result { - use std::panic::resume_unwind; - - let socket = crate::spawn_blocking(move || Socket2::new(domain, ty, protocol)) - .await - .unwrap_or_else(|e| resume_unwind(e))?; - Self::from_socket2(socket) - } - - #[cfg(unix)] - pub async fn new( - domain: Domain, - ty: Type, - protocol: Option, - ) -> io::Result { - use std::os::fd::FromRawFd; - - #[allow(unused_mut)] - let mut ty: i32 = ty.into(); - #[cfg(any( - target_os = "android", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "fuchsia", - target_os = "hurd", - target_os = "illumos", - target_os = "linux", - target_os = "netbsd", - target_os = "openbsd", - ))] - { - ty |= libc::SOCK_CLOEXEC; - } - - let op = crate::driver::op::CreateSocket::new( - domain.into(), - ty, - protocol.map(|p| p.into()).unwrap_or_default(), - ); - let (res, _) = crate::submit(op).await; - let socket = unsafe { Socket2::from_raw_fd(res? as _) }; - - Self::from_socket2(socket) - } - - pub async fn bind( - addr: &SockAddr, - ty: Type, - protocol: Option, - ) -> io::Result { - let socket = Self::new(addr.domain(), ty, protocol).await?; - socket.socket.bind(addr)?; - Ok(socket) - } - - #[cfg(unix)] - pub unsafe fn get_socket_option( - &self, - level: i32, - name: i32, - ) -> io::Result { - let mut value: MaybeUninit = MaybeUninit::uninit(); - let mut len = size_of::() as libc::socklen_t; - syscall!(libc::getsockopt( - self.socket.as_raw_fd(), - level, - name, - value.as_mut_ptr() as _, - &mut len - )) - .map(|_| { - debug_assert_eq!(len as usize, size_of::()); - // SAFETY: The value is initialized by `getsockopt`. - value.assume_init() - }) - } - - #[cfg(windows)] - pub unsafe fn get_socket_option( - &self, - level: i32, - name: i32, - ) -> io::Result { - let mut value: MaybeUninit = MaybeUninit::uninit(); - let mut len = size_of::() as i32; - syscall!( - SOCKET, - windows_sys::Win32::Networking::WinSock::getsockopt( - self.socket.as_raw_fd() as _, - level, - name, - value.as_mut_ptr() as _, - &mut len - ) - ) - .map(|_| { - debug_assert_eq!(len as usize, size_of::()); - // SAFETY: The value is initialized by `getsockopt`. - value.assume_init() - }) - } - - #[cfg(unix)] - pub unsafe fn set_socket_option( - &self, - level: i32, - name: i32, - value: &T, - ) -> io::Result<()> { - syscall!(libc::setsockopt( - self.socket.as_raw_fd(), - level, - name, - value as *const _ as _, - std::mem::size_of::() as _ - )) - .map(|_| ()) - } - - #[cfg(windows)] - pub unsafe fn set_socket_option( - &self, - level: i32, - name: i32, - value: &T, - ) -> io::Result<()> { - syscall!( - SOCKET, - windows_sys::Win32::Networking::WinSock::setsockopt( - self.socket.as_raw_fd() as _, - level, - name, - value as *const _ as _, - std::mem::size_of::() as _ - ) - ) - .map(|_| ()) - } -} - -impl_raw_fd!(Socket, Socket2, socket, socket); diff --git a/ntex-neon/src/net/tcp.rs b/ntex-neon/src/net/tcp.rs deleted file mode 100644 index 84b6c014..00000000 --- a/ntex-neon/src/net/tcp.rs +++ /dev/null @@ -1,44 +0,0 @@ -use std::{io, net::SocketAddr}; - -use socket2::Socket as Socket2; - -use crate::{impl_raw_fd, net::Socket}; - -/// A TCP stream between a local and a remote socket. -/// -/// A TCP stream can either be created by connecting to an endpoint, via the -/// `connect` method, or by accepting a connection from a listener. -#[derive(Debug)] -pub struct TcpStream { - inner: Socket, -} - -impl TcpStream { - /// Creates new TcpStream from a std::net::TcpStream. - pub fn from_std(stream: std::net::TcpStream) -> io::Result { - Ok(Self { - inner: Socket::from_socket2(Socket2::from(stream))?, - }) - } - - /// Creates new TcpStream from a std::net::TcpStream. - pub fn from_socket(inner: Socket) -> Self { - Self { inner } - } - - /// Returns the socket address of the remote peer of this TCP connection. - pub fn peer_addr(&self) -> io::Result { - self.inner - .peer_addr() - .map(|addr| addr.as_socket().expect("should be SocketAddr")) - } - - /// Returns the socket address of the local half of this TCP connection. - pub fn local_addr(&self) -> io::Result { - self.inner - .local_addr() - .map(|addr| addr.as_socket().expect("should be SocketAddr")) - } -} - -impl_raw_fd!(TcpStream, socket2::Socket, inner, socket); diff --git a/ntex-neon/src/net/unix.rs b/ntex-neon/src/net/unix.rs deleted file mode 100644 index 833865d0..00000000 --- a/ntex-neon/src/net/unix.rs +++ /dev/null @@ -1,91 +0,0 @@ -use std::io; - -use socket2::{SockAddr, Socket as Socket2}; - -use crate::{impl_raw_fd, net::Socket}; - -/// A Unix stream between two local sockets on Windows & WSL. -/// -/// A Unix stream can either be created by connecting to an endpoint, via the -/// `connect` method. -#[derive(Debug)] -pub struct UnixStream { - inner: Socket, -} - -impl UnixStream { - #[cfg(unix)] - /// Creates new UnixStream from a std::os::unix::net::UnixStream. - pub fn from_std(stream: std::os::unix::net::UnixStream) -> io::Result { - Ok(Self { - inner: Socket::from_socket2(Socket2::from(stream))?, - }) - } - - /// Creates new TcpStream from a Socket. - pub fn from_socket(inner: Socket) -> Self { - Self { inner } - } - - /// Returns the socket path of the remote peer of this connection. - pub fn peer_addr(&self) -> io::Result { - #[allow(unused_mut)] - let mut addr = self.inner.peer_addr()?; - #[cfg(windows)] - { - fix_unix_socket_length(&mut addr); - } - Ok(addr) - } - - /// Returns the socket path of the local half of this connection. - pub fn local_addr(&self) -> io::Result { - self.inner.local_addr() - } -} - -impl_raw_fd!(UnixStream, socket2::Socket, inner, socket); - -#[cfg(windows)] -#[inline] -fn empty_unix_socket() -> SockAddr { - use windows_sys::Win32::Networking::WinSock::{AF_UNIX, SOCKADDR_UN}; - - // SAFETY: the length is correct - unsafe { - SockAddr::try_init(|addr, len| { - let addr: *mut SOCKADDR_UN = addr.cast(); - std::ptr::write( - addr, - SOCKADDR_UN { - sun_family: AF_UNIX, - sun_path: [0; 108], - }, - ); - std::ptr::write(len, 3); - Ok(()) - }) - } - // it is always Ok - .unwrap() - .1 -} - -// The peer addr returned after ConnectEx is buggy. It contains bytes that -// should not belong to the address. Luckily a unix path should not contain `\0` -// until the end. We can determine the path ending by that. -#[cfg(windows)] -#[inline] -fn fix_unix_socket_length(addr: &mut SockAddr) { - use windows_sys::Win32::Networking::WinSock::SOCKADDR_UN; - - // SAFETY: cannot construct non-unix socket address in safe way. - let unix_addr: &SOCKADDR_UN = unsafe { &*addr.as_ptr().cast() }; - let addr_len = match std::ffi::CStr::from_bytes_until_nul(&unix_addr.sun_path) { - Ok(str) => str.to_bytes_with_nul().len() + 2, - Err(_) => std::mem::size_of::(), - }; - unsafe { - addr.set_length(addr_len as _); - } -} diff --git a/ntex-neon/src/op.rs b/ntex-neon/src/op.rs deleted file mode 100644 index 5138ed4c..00000000 --- a/ntex-neon/src/op.rs +++ /dev/null @@ -1,38 +0,0 @@ -use std::{future::Future, io, pin::Pin, task::Context, task::Poll}; - -use crate::driver::{Key, OpCode, PushEntry}; -use crate::rt::Runtime; - -#[derive(Debug)] -pub(crate) struct OpFuture { - key: Option>, -} - -impl OpFuture { - pub(crate) fn new(key: Key) -> Self { - Self { key: Some(key) } - } -} - -impl Future for OpFuture { - type Output = (io::Result, T); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let res = Runtime::with_current(|r| r.poll_task(cx, self.key.take().unwrap())); - match res { - PushEntry::Pending(key) => { - self.key = Some(key); - Poll::Pending - } - PushEntry::Ready(res) => Poll::Ready(res), - } - } -} - -impl Drop for OpFuture { - fn drop(&mut self) { - if let Some(key) = self.key.take() { - Runtime::with_current(|r| r.cancel_op(key)); - } - } -} diff --git a/ntex-neon/src/rt.rs b/ntex-neon/src/rt.rs deleted file mode 100644 index a4197671..00000000 --- a/ntex-neon/src/rt.rs +++ /dev/null @@ -1,418 +0,0 @@ -#![allow(clippy::type_complexity)] -use std::any::{Any, TypeId}; -use std::collections::{HashMap, VecDeque}; -use std::{ - cell::Cell, cell::RefCell, future::Future, io, sync::Arc, task::Context, thread, - time::Duration, -}; - -use async_task::{Runnable, Task}; -use crossbeam_queue::SegQueue; - -use crate::driver::{ - op::Asyncify, AsRawFd, Key, NotifyHandle, OpCode, Proactor, ProactorBuilder, PushEntry, - RawFd, -}; - -use crate::op::OpFuture; - -scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime); - -/// Type alias for `Task>>`, which resolves to an -/// `Err` when the spawned future panicked. -pub type JoinHandle = Task>>; - -pub struct RemoteHandle { - handle: NotifyHandle, - runnables: Arc, -} - -impl RemoteHandle { - /// Wake up runtime - pub fn notify(&self) { - self.handle.notify().ok(); - } - - /// Spawns a new asynchronous task, returning a [`Task`] for it. - /// - /// Spawning a task enables the task to execute concurrently to other tasks. - /// There is no guarantee that a spawned task will execute to completion. - pub fn spawn(&self, future: F) -> Task { - let runnables = self.runnables.clone(); - let handle = self.handle.clone(); - let schedule = move |runnable| { - runnables.schedule(runnable, &handle); - }; - let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; - runnable.schedule(); - task - } -} - -/// The async runtime for ntex. It is a thread local runtime, and cannot be -/// sent to other threads. -pub struct Runtime { - driver: Proactor, - runnables: Arc, - event_interval: usize, - data: RefCell, fxhash::FxBuildHasher>>, -} - -impl Runtime { - /// Create [`Runtime`] with default config. - pub fn new() -> io::Result { - Self::builder().build() - } - - /// Create a builder for [`Runtime`]. - pub fn builder() -> RuntimeBuilder { - RuntimeBuilder::new() - } - - #[allow(clippy::arc_with_non_send_sync)] - fn with_builder(builder: &RuntimeBuilder) -> io::Result { - Ok(Self { - driver: builder.proactor_builder.build()?, - runnables: Arc::new(RunnableQueue::new()), - event_interval: builder.event_interval, - data: RefCell::new(HashMap::default()), - }) - } - - /// Perform a function on the current runtime. - /// - /// ## Panics - /// - /// This method will panic if there are no running [`Runtime`]. - pub fn with_current T>(f: F) -> T { - #[cold] - fn not_in_neon_runtime() -> ! { - panic!("not in a neon runtime") - } - - if CURRENT_RUNTIME.is_set() { - CURRENT_RUNTIME.with(f) - } else { - not_in_neon_runtime() - } - } - - /// Get current driver - pub fn driver(&self) -> &Proactor { - &self.driver - } - - /// Get handle for current runtime - pub fn handle(&self) -> RemoteHandle { - RemoteHandle { - handle: self.driver.handle(), - runnables: self.runnables.clone(), - } - } - - /// Block on the future till it completes. - pub fn block_on(&self, future: F) -> F::Output { - CURRENT_RUNTIME.set(self, || { - let mut result = None; - unsafe { self.spawn_unchecked(async { result = Some(future.await) }) }.detach(); - - self.runnables.run(self.event_interval); - loop { - if let Some(result) = result.take() { - return result; - } - - self.poll_with_driver(self.runnables.has_tasks(), || { - self.runnables.run(self.event_interval); - }); - } - }) - } - - /// Spawns a new asynchronous task, returning a [`Task`] for it. - /// - /// Spawning a task enables the task to execute concurrently to other tasks. - /// There is no guarantee that a spawned task will execute to completion. - pub fn spawn(&self, future: F) -> Task { - unsafe { self.spawn_unchecked(future) } - } - - /// Spawns a new asynchronous task, returning a [`Task`] for it. - /// - /// # Safety - /// - /// The caller should ensure the captured lifetime is long enough. - pub unsafe fn spawn_unchecked(&self, future: F) -> Task { - let runnables = self.runnables.clone(); - let handle = self.driver.handle(); - let schedule = move |runnable| { - runnables.schedule(runnable, &handle); - }; - let (runnable, task) = async_task::spawn_unchecked(future, schedule); - runnable.schedule(); - task - } - - /// Spawns a blocking task in a new thread, and wait for it. - /// - /// The task will not be cancelled even if the future is dropped. - pub fn spawn_blocking(&self, f: F) -> JoinHandle - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - let op = Asyncify::new(move || { - let res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)); - (Ok(0), res) - }); - // It is safe to use `submit` here because the task is spawned immediately. - unsafe { - let fut = self.submit_with_flags(op); - self.spawn_unchecked(async move { fut.await.1.into_inner() }) - } - } - - fn submit_raw( - &self, - op: T, - ) -> PushEntry, (io::Result, T)> { - self.driver.push(op) - } - - fn submit_with_flags( - &self, - op: T, - ) -> impl Future, T)> { - let fut = self.submit_raw(op); - - async move { - match fut { - PushEntry::Pending(user_data) => OpFuture::new(user_data).await, - PushEntry::Ready(res) => res, - } - } - } - - pub(crate) fn cancel_op(&self, op: Key) { - self.driver.cancel(op); - } - - pub(crate) fn poll_task( - &self, - cx: &mut Context, - op: Key, - ) -> PushEntry, (io::Result, T)> { - self.driver.pop(op).map_pending(|mut k| { - self.driver.update_waker(&mut k, cx.waker().clone()); - k - }) - } - - fn poll_with_driver(&self, has_tasks: bool, f: F) { - let timeout = if has_tasks { - Some(Duration::ZERO) - } else { - None - }; - - if let Err(e) = self.driver.poll(timeout, f) { - match e.kind() { - io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => { - log::debug!("expected error: {e}"); - } - _ => panic!("{e:?}"), - } - } - } - - /// Insert a type into this runtime. - pub fn insert(&self, val: T) { - self.data - .borrow_mut() - .insert(TypeId::of::(), Box::new(val)); - } - - /// Check if container contains entry - pub fn contains(&self) -> bool { - self.data.borrow().contains_key(&TypeId::of::()) - } - - /// Get a reference to a type previously inserted on this runtime. - pub fn get(&self) -> Option - where - T: Clone + 'static, - { - self.data - .borrow() - .get(&TypeId::of::()) - .and_then(|boxed| boxed.downcast_ref().cloned()) - } -} - -impl AsRawFd for Runtime { - fn as_raw_fd(&self) -> RawFd { - self.driver.as_raw_fd() - } -} - -impl Drop for Runtime { - fn drop(&mut self) { - CURRENT_RUNTIME.set(self, || { - while self.runnables.sync_runnables.pop().is_some() {} - loop { - let runnable = self.runnables.local_runnables.borrow_mut().pop_front(); - if runnable.is_none() { - break; - } - } - }) - } -} - -struct RunnableQueue { - id: thread::ThreadId, - idle: Cell, - local_runnables: RefCell>, - sync_runnables: SegQueue, -} - -impl RunnableQueue { - fn new() -> Self { - Self { - id: thread::current().id(), - idle: Cell::new(true), - local_runnables: RefCell::new(VecDeque::new()), - sync_runnables: SegQueue::new(), - } - } - - fn schedule(&self, runnable: Runnable, handle: &NotifyHandle) { - if self.id == thread::current().id() { - self.local_runnables.borrow_mut().push_back(runnable); - if self.idle.get() { - let _ = handle.notify(); - } - } else { - self.sync_runnables.push(runnable); - handle.notify().ok(); - } - } - - fn run(&self, event_interval: usize) { - self.idle.set(false); - for _ in 0..event_interval { - let task = self.local_runnables.borrow_mut().pop_front(); - if let Some(task) = task { - task.run(); - } else { - break; - } - } - - for _ in 0..event_interval { - if !self.sync_runnables.is_empty() { - if let Some(task) = self.sync_runnables.pop() { - task.run(); - continue; - } - } - break; - } - self.idle.set(true); - } - - fn has_tasks(&self) -> bool { - !(self.local_runnables.borrow().is_empty() && self.sync_runnables.is_empty()) - } -} - -/// Builder for [`Runtime`]. -#[derive(Debug, Clone)] -pub struct RuntimeBuilder { - proactor_builder: ProactorBuilder, - event_interval: usize, -} - -impl Default for RuntimeBuilder { - fn default() -> Self { - Self::new() - } -} - -impl RuntimeBuilder { - /// Create the builder with default config. - pub fn new() -> Self { - Self { - proactor_builder: ProactorBuilder::new(), - event_interval: 61, - } - } - - /// Replace proactor builder. - pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self { - self.proactor_builder = builder; - self - } - - /// Sets the number of scheduler ticks after which the scheduler will poll - /// for external events (timers, I/O, and so on). - /// - /// A scheduler “tick” roughly corresponds to one poll invocation on a task. - pub fn event_interval(&mut self, val: usize) -> &mut Self { - self.event_interval = val; - self - } - - /// Build [`Runtime`]. - pub fn build(&self) -> io::Result { - Runtime::with_builder(self) - } -} - -/// Spawns a new asynchronous task, returning a [`Task`] for it. -/// -/// Spawning a task enables the task to execute concurrently to other tasks. -/// There is no guarantee that a spawned task will execute to completion. -/// -/// ## Panics -/// -/// This method doesn't create runtime. It tries to obtain the current runtime -/// by [`Runtime::with_current`]. -pub fn spawn(future: F) -> Task { - Runtime::with_current(|r| r.spawn(future)) -} - -/// Spawns a blocking task in a new thread, and wait for it. -/// -/// The task will not be cancelled even if the future is dropped. -/// -/// ## Panics -/// -/// This method doesn't create runtime. It tries to obtain the current runtime -/// by [`Runtime::with_current`]. -pub fn spawn_blocking( - f: impl (FnOnce() -> T) + Send + 'static, -) -> JoinHandle { - Runtime::with_current(|r| r.spawn_blocking(f)) -} - -/// Submit an operation to the current runtime, and return a future for it. -/// -/// ## Panics -/// -/// This method doesn't create runtime. It tries to obtain the current runtime -/// by [`Runtime::with_current`]. -pub async fn submit(op: T) -> (io::Result, T) { - submit_with_flags(op).await -} - -/// Submit an operation to the current runtime, and return a future for it with -/// flags. -/// -/// ## Panics -/// -/// This method doesn't create runtime. It tries to obtain the current runtime -/// by [`Runtime::with_current`]. -pub async fn submit_with_flags(op: T) -> (io::Result, T) { - Runtime::with_current(|r| r.submit_with_flags(op)).await -} diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 4154fca2..3ec61e56 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -25,10 +25,10 @@ tokio = ["ntex-rt/tokio", "ntex-tokio"] compio = ["ntex-rt/compio", "ntex-compio"] # neon runtime -neon = ["ntex-rt/neon", "ntex-neon/polling", "slab", "socket2"] +neon = ["ntex-rt/neon", "ntex-neon", "slab", "socket2"] -# neon io-uring runtime -neon-uring = ["ntex-rt/neon", "ntex-neon/io-uring", "io-uring", "slab", "socket2"] +polling = ["ntex-neon/polling", "dep:polling"] +io-uring = ["ntex-neon/io-uring", "dep:io-uring"] [dependencies] ntex-service = "3.3" @@ -43,15 +43,17 @@ ntex-compio = { version = "0.2.4", optional = true } ntex-neon = { version = "0.1.0", optional = true } bitflags = { workspace = true } +cfg-if = { workspace = true } log = { workspace = true } libc = { workspace = true } -thiserror = { workspace = true } slab = { workspace = true, optional = true } socket2 = { workspace = true, optional = true } +thiserror = { workspace = true } # Linux specific dependencies [target.'cfg(target_os = "linux")'.dependencies] io-uring = { workspace = true, optional = true } +polling = { workspace = true, optional = true } [dev-dependencies] ntex = "2" diff --git a/ntex-net/src/compat.rs b/ntex-net/src/compat.rs index 036ed756..fdc84f71 100644 --- a/ntex-net/src/compat.rs +++ b/ntex-net/src/compat.rs @@ -6,51 +6,18 @@ pub use ntex_tokio::{from_tcp_stream, tcp_connect, tcp_connect_in}; #[cfg(all(unix, feature = "tokio"))] pub use ntex_tokio::{from_unix_stream, unix_connect, unix_connect_in}; -#[cfg(all( - feature = "neon", - not(feature = "neon-uring"), - not(feature = "tokio"), - not(feature = "compio") -))] -pub use crate::rt_polling::{ - from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect, - unix_connect_in, -}; - -#[cfg(all( - feature = "neon-uring", - not(feature = "neon"), - not(feature = "tokio"), - not(feature = "compio") -))] -pub use crate::rt_uring::{ - from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect, - unix_connect_in, -}; - -#[cfg(all( - feature = "compio", - not(feature = "tokio"), - not(feature = "neon"), - not(feature = "neon-uring") -))] +#[cfg(all(feature = "compio", not(feature = "tokio"), not(feature = "neon")))] pub use ntex_compio::{from_tcp_stream, tcp_connect, tcp_connect_in}; #[cfg(all( unix, feature = "compio", not(feature = "tokio"), - not(feature = "neon"), - not(feature = "neon-uring") + not(feature = "neon") ))] pub use ntex_compio::{from_unix_stream, unix_connect, unix_connect_in}; -#[cfg(all( - not(feature = "tokio"), - not(feature = "compio"), - not(feature = "neon"), - not(feature = "neon-uring") -))] +#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))] mod no_rt { use ntex_io::Io; @@ -115,10 +82,5 @@ mod no_rt { } } -#[cfg(all( - not(feature = "tokio"), - not(feature = "compio"), - not(feature = "neon"), - not(feature = "neon-uring") -))] +#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))] pub use no_rt::*; diff --git a/ntex-net/src/lib.rs b/ntex-net/src/lib.rs index 448f7c65..7fbff539 100644 --- a/ntex-net/src/lib.rs +++ b/ntex-net/src/lib.rs @@ -8,20 +8,22 @@ pub mod connect; pub use ntex_io::Io; pub use ntex_rt::{spawn, spawn_blocking}; -pub use self::compat::*; - -#[cfg(all( - feature = "neon", - not(feature = "neon-uring"), - not(feature = "tokio"), - not(feature = "compio") -))] -mod rt_polling; - -#[cfg(all( - feature = "neon-uring", - not(feature = "neon"), - not(feature = "tokio"), - not(feature = "compio") -))] -mod rt_uring; +cfg_if::cfg_if! { + if #[cfg(all(feature = "neon", target_os = "linux", feature = "io-uring"))] { + #[path = "rt_uring/mod.rs"] + mod rt_impl; + pub use self::rt_impl::{ + from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect, + unix_connect_in, + }; + } else if #[cfg(all(unix, feature = "neon"))] { + #[path = "rt_polling/mod.rs"] + mod rt_impl; + pub use self::rt_impl::{ + from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect, + unix_connect_in, + }; + } else { + pub use self::compat::*; + } +} diff --git a/ntex-net/src/rt_uring/driver.rs b/ntex-net/src/rt_uring/driver.rs index 2dddea43..e3a9f0f1 100644 --- a/ntex-net/src/rt_uring/driver.rs +++ b/ntex-net/src/rt_uring/driver.rs @@ -1,8 +1,7 @@ use std::{cell::RefCell, fmt, io, mem, num::NonZeroU32, rc::Rc, task::Poll}; use io_uring::{opcode, squeue::Entry, types::Fd}; -use ntex_neon::driver::op::Handler; -use ntex_neon::driver::{AsRawFd, DriverApi}; +use ntex_neon::driver::{op::Handler, AsRawFd, DriverApi}; use ntex_neon::Runtime; use ntex_util::channel::oneshot; use slab::Slab; diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index d826c699..d15ad9e2 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [2.10.0] - 2025-03-12 + +* Add "Inplace" channel + +* Expose "yield_to" helper + ## [2.9.0] - 2025-01-15 * Add EitherService/EitherServiceFactory diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index 96ceff5f..ef999259 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "2.9.0" +version = "2.10.0" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-util/src/channel/inplace.rs b/ntex-util/src/channel/inplace.rs new file mode 100644 index 00000000..88a119fe --- /dev/null +++ b/ntex-util/src/channel/inplace.rs @@ -0,0 +1,81 @@ +//! A futures-aware bounded(1) channel. +use std::{cell::Cell, fmt, future::poll_fn, task::Context, task::Poll}; + +use crate::task::LocalWaker; + +/// Creates a new futures-aware, channel. +pub fn channel() -> Inplace { + Inplace { + value: Cell::new(None), + rx_task: LocalWaker::new(), + } +} + +/// A futures-aware bounded(1) channel. +pub struct Inplace { + value: Cell>, + rx_task: LocalWaker, +} + +// The channels do not ever project Pin to the inner T +impl Unpin for Inplace {} + +impl fmt::Debug for Inplace { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Inplace") + } +} + +impl Inplace { + /// Set a successful result. + /// + /// If the value is successfully enqueued for the remote end to receive, + /// then `Ok(())` is returned. If previose value is not consumed + /// then `Err` is returned with the value provided. + pub fn send(&self, val: T) -> Result<(), T> { + if let Some(v) = self.value.take() { + self.value.set(Some(v)); + Err(val) + } else { + self.value.set(Some(val)); + self.rx_task.wake(); + Ok(()) + } + } + + /// Wait until the oneshot is ready and return value + pub async fn recv(&self) -> T { + poll_fn(|cx| self.poll_recv(cx)).await + } + + /// Polls the oneshot to determine if value is ready + pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll { + // If we've got a value, then skip the logic below as we're done. + if let Some(val) = self.value.take() { + return Poll::Ready(val); + } + + // Check if sender is dropped and return error if it is. + self.rx_task.register(cx.waker()); + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::future::lazy; + + #[ntex_macros::rt_test2] + async fn test_inplace() { + let ch = channel(); + assert_eq!(lazy(|cx| ch.poll_recv(cx)).await, Poll::Pending); + + assert!(ch.send(1).is_ok()); + assert!(ch.send(2) == Err(2)); + assert_eq!(lazy(|cx| ch.poll_recv(cx)).await, Poll::Ready(1)); + + assert!(ch.send(1).is_ok()); + assert_eq!(ch.recv().await, 1); + } +} diff --git a/ntex-util/src/channel/mod.rs b/ntex-util/src/channel/mod.rs index a8652c6b..06e5f2f1 100644 --- a/ntex-util/src/channel/mod.rs +++ b/ntex-util/src/channel/mod.rs @@ -2,6 +2,7 @@ mod cell; pub mod condition; +pub mod inplace; pub mod mpsc; pub mod oneshot; pub mod pool; diff --git a/ntex-util/src/task.rs b/ntex-util/src/task.rs index a2c427ce..466715a6 100644 --- a/ntex-util/src/task.rs +++ b/ntex-util/src/task.rs @@ -91,7 +91,6 @@ impl fmt::Debug for LocalWaker { } } -#[doc(hidden)] /// Yields execution back to the current runtime. pub async fn yield_to() { use std::{future::Future, pin::Pin, task::Context, task::Poll}; diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 919cab00..0ead2c63 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -52,7 +52,7 @@ compio = ["ntex-net/compio"] neon = ["ntex-net/neon"] # neon runtime -neon-uring = ["ntex-net/neon-uring"] +neon-uring = ["ntex-net/neon", "ntex-net/io-uring"] # websocket support ws = ["dep:sha-1"] @@ -69,7 +69,7 @@ ntex-macros = "0.1" ntex-util = "2.8" ntex-bytes = "0.1.27" ntex-server = "2.7" -ntex-h2 = "1.8.1" +ntex-h2 = "1.8.6" ntex-rt = "0.4.27" ntex-io = "2.11" ntex-net = "2.5" diff --git a/ntex/tests/web_ws.rs b/ntex/tests/web_ws.rs index 2187321d..20e1d324 100644 --- a/ntex/tests/web_ws.rs +++ b/ntex/tests/web_ws.rs @@ -21,6 +21,8 @@ async fn service(msg: ws::Frame) -> Result, io::Error> { #[ntex::test] async fn web_ws() { + let _ = env_logger::try_init(); + let srv = test::server(|| { App::new().service(web::resource("/").route(web::to( |req: HttpRequest| async move {