Refactor uring feature (#518)

This commit is contained in:
Nikolay Kim 2025-03-12 15:12:28 +05:00 committed by GitHub
parent db16b71c5f
commit 12afaa00ea
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 125 additions and 3001 deletions

View file

@ -8,7 +8,6 @@ members = [
"ntex-http",
"ntex-router",
"ntex-rt",
"ntex-neon",
"ntex-net",
"ntex-server",
"ntex-service",
@ -33,7 +32,6 @@ ntex = { path = "ntex" }
ntex-bytes = { path = "ntex-bytes" }
ntex-codec = { path = "ntex-codec" }
ntex-io = { path = "ntex-io" }
ntex-neon = { path = "ntex-neon" }
ntex-net = { path = "ntex-net" }
ntex-http = { path = "ntex-http" }
ntex-router = { path = "ntex-router" }
@ -47,6 +45,8 @@ ntex-util = { path = "ntex-util" }
ntex-compio = { path = "ntex-compio" }
ntex-tokio = { path = "ntex-tokio" }
ntex-neon = { git = "https://github.com/ntex-rs/neon.git" }
[workspace.dependencies]
async-task = "4.5.0"
bitflags = "2"

View file

@ -1,81 +0,0 @@
[package]
name = "ntex-neon"
version = "0.1.0"
description = "Async runtime for ntex"
categories = ["asynchronous"]
keywords = ["async", "runtime"]
edition = { workspace = true }
authors = { workspace = true }
readme = { workspace = true }
license = { workspace = true }
repository = { workspace = true }
[package.metadata.docs.rs]
all-features = true
default-target = "x86_64-unknown-linux-gnu"
rustdoc-args = ["--cfg", "docsrs"]
targets = [
"x86_64-pc-windows-gnu",
"x86_64-unknown-linux-gnu",
"x86_64-apple-darwin",
"aarch64-apple-ios",
"aarch64-linux-android",
"x86_64-unknown-dragonfly",
"x86_64-unknown-freebsd",
"x86_64-unknown-illumos",
"x86_64-unknown-netbsd",
"x86_64-unknown-openbsd",
]
[dependencies]
async-task = { workspace = true }
bitflags = { workspace = true }
cfg-if = { workspace = true }
crossbeam-queue = { workspace = true }
fxhash = { workspace = true }
nohash-hasher = { workspace = true }
log = { workspace = true }
scoped-tls = { workspace = true }
socket2 = { workspace = true, features = ["all"] }
# Windows specific dependencies
[target.'cfg(windows)'.dependencies]
aligned-array = "1.0.1"
windows-sys = { workspace = true, features = [
"Win32_Foundation",
"Win32_Networking_WinSock",
"Win32_Security",
"Win32_Storage_FileSystem",
"Win32_System_Console",
"Win32_System_IO",
"Win32_System_Pipes",
"Win32_System_SystemServices",
"Win32_System_Threading",
"Win32_System_WindowsProgramming",
] }
# Unix specific dependencies
[target.'cfg(unix)'.dependencies]
crossbeam-channel = { workspace = true }
crossbeam-queue = { workspace = true }
libc = { workspace = true }
polling = { workspace = true }
# Linux specific dependencies
[target.'cfg(target_os = "linux")'.dependencies]
io-uring = { workspace = true, optional = true }
polling = { workspace = true, optional = true }
# Other platform dependencies
[target.'cfg(all(not(target_os = "linux"), unix))'.dependencies]
polling = { workspace = true }
[target.'cfg(windows)'.dev-dependencies]
windows-sys = { workspace = true, features = ["Win32_UI_WindowsAndMessaging"] }
[build-dependencies]
cfg_aliases = { workspace = true }
[features]
default = ["polling"]
polling = ["dep:polling"]

View file

@ -1,128 +0,0 @@
use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc};
use std::{fmt, time::Duration};
use crossbeam_channel::{bounded, Receiver, Sender, TrySendError};
/// An error that may be emitted when all worker threads are busy. It simply
/// returns the dispatchable value with a convenient [`fmt::Debug`] and
/// [`fmt::Display`] implementation.
#[derive(Copy, Clone, PartialEq, Eq)]
pub struct DispatchError<T>(pub T);
impl<T> DispatchError<T> {
/// Consume the error, yielding the dispatchable that failed to be sent.
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> fmt::Debug for DispatchError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"DispatchError(..)".fmt(f)
}
}
impl<T> fmt::Display for DispatchError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"all threads are busy".fmt(f)
}
}
impl<T> std::error::Error for DispatchError<T> {}
type BoxedDispatchable = Box<dyn Dispatchable + Send>;
/// A trait for dispatching a closure. It's implemented for all `FnOnce() + Send
/// + 'static` but may also be implemented for any other types that are `Send`
/// and `'static`.
pub trait Dispatchable: Send + 'static {
/// Run the dispatchable
fn run(self: Box<Self>);
}
impl<F> Dispatchable for F
where
F: FnOnce() + Send + 'static,
{
fn run(self: Box<Self>) {
(*self)()
}
}
struct CounterGuard(Arc<AtomicUsize>);
impl Drop for CounterGuard {
fn drop(&mut self) {
self.0.fetch_sub(1, Ordering::AcqRel);
}
}
fn worker(
receiver: Receiver<BoxedDispatchable>,
counter: Arc<AtomicUsize>,
timeout: Duration,
) -> impl FnOnce() {
move || {
counter.fetch_add(1, Ordering::AcqRel);
let _guard = CounterGuard(counter);
while let Ok(f) = receiver.recv_timeout(timeout) {
f.run();
}
}
}
/// A thread pool to perform blocking operations in other threads.
#[derive(Debug, Clone)]
pub struct AsyncifyPool {
sender: Sender<BoxedDispatchable>,
receiver: Receiver<BoxedDispatchable>,
counter: Arc<AtomicUsize>,
thread_limit: usize,
recv_timeout: Duration,
}
impl AsyncifyPool {
/// Create [`AsyncifyPool`] with thread number limit and channel receive
/// timeout.
pub fn new(thread_limit: usize, recv_timeout: Duration) -> Self {
let (sender, receiver) = bounded(0);
Self {
sender,
receiver,
counter: Arc::new(AtomicUsize::new(0)),
thread_limit,
recv_timeout,
}
}
/// Send a dispatchable, usually a closure, to another thread. Usually the
/// user should not use it. When all threads are busy and thread number
/// limit has been reached, it will return an error with the original
/// dispatchable.
pub fn dispatch<D: Dispatchable>(&self, f: D) -> Result<(), DispatchError<D>> {
match self.sender.try_send(Box::new(f) as BoxedDispatchable) {
Ok(_) => Ok(()),
Err(e) => match e {
TrySendError::Full(f) => {
if self.counter.load(Ordering::Acquire) >= self.thread_limit {
// Safety: we can ensure the type
Err(DispatchError(*unsafe {
Box::from_raw(Box::into_raw(f).cast())
}))
} else {
std::thread::spawn(worker(
self.receiver.clone(),
self.counter.clone(),
self.recv_timeout,
));
self.sender.send(f).expect("the channel should not be full");
Ok(())
}
}
TrySendError::Disconnected(_) => {
unreachable!("receiver should not all disconnected")
}
},
}
}
}

View file

@ -1,108 +0,0 @@
use std::sync::atomic::{AtomicU8, Ordering};
const UNINIT: u8 = u8::MAX;
const IO_URING: u8 = 0;
const POLLING: u8 = 1;
const IOCP: u8 = 2;
static DRIVER_TYPE: AtomicU8 = AtomicU8::new(UNINIT);
/// Representing underlying driver type the fusion driver is using
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DriverType {
/// Using `polling` driver
Poll = POLLING,
/// Using `io-uring` driver
IoUring = IO_URING,
/// Using `iocp` driver
IOCP = IOCP,
}
impl DriverType {
fn from_num(n: u8) -> Self {
match n {
IO_URING => Self::IoUring,
POLLING => Self::Poll,
IOCP => Self::IOCP,
_ => unreachable!("invalid driver type"),
}
}
/// Get the underlying driver type
fn get() -> DriverType {
cfg_if::cfg_if! {
if #[cfg(windows)] {
DriverType::IOCP
} else if #[cfg(all(target_os = "linux", feature = "polling", feature = "io-uring"))] {
use io_uring::opcode::*;
// Add more opcodes here if used
const USED_OP: &[u8] = &[
Read::CODE,
Readv::CODE,
Write::CODE,
Writev::CODE,
Fsync::CODE,
Accept::CODE,
Connect::CODE,
RecvMsg::CODE,
SendMsg::CODE,
AsyncCancel::CODE,
OpenAt::CODE,
Close::CODE,
Shutdown::CODE,
];
(|| {
let uring = io_uring::IoUring::new(2)?;
let mut probe = io_uring::Probe::new();
uring.submitter().register_probe(&mut probe)?;
if USED_OP.iter().all(|op| probe.is_supported(*op)) {
std::io::Result::Ok(DriverType::IoUring)
} else {
Ok(DriverType::Poll)
}
})()
.unwrap_or(DriverType::Poll) // Should we fail here?
} else if #[cfg(all(target_os = "linux", feature = "io-uring"))] {
DriverType::IoUring
} else if #[cfg(unix)] {
DriverType::Poll
} else {
compile_error!("unsupported platform");
}
}
}
/// Get the underlying driver type and cache it. Following calls will return
/// the cached value.
pub fn current() -> DriverType {
match DRIVER_TYPE.load(Ordering::Acquire) {
UNINIT => {}
x => return DriverType::from_num(x),
}
let dev_ty = Self::get();
DRIVER_TYPE.store(dev_ty as u8, Ordering::Release);
dev_ty
}
/// Check if the current driver is `polling`
pub fn is_polling() -> bool {
Self::current() == DriverType::Poll
}
/// Check if the current driver is `io-uring`
pub fn is_iouring() -> bool {
Self::current() == DriverType::IoUring
}
/// Check if the current driver is `iocp`
pub fn is_iocp() -> bool {
Self::current() == DriverType::IOCP
}
}

View file

@ -1,215 +0,0 @@
use std::{io, marker::PhantomData, mem::MaybeUninit, pin::Pin, task::Waker};
use super::{OpCode, Overlapped, PushEntry, RawFd};
/// An operation with other needed information. It should be allocated on the
/// heap. The pointer to this struct is used as `user_data`, and on Windows, it
/// is used as the pointer to `OVERLAPPED`.
///
/// `*const RawOp<dyn OpCode>` can be obtained from any `Key<T: OpCode>` by
/// first casting `Key::user_data` to `*const RawOp<()>`, then upcasted with
/// `upcast_fn`. It is done in [`Key::as_op_pin`].
#[repr(C)]
pub(crate) struct RawOp<T: ?Sized> {
header: Overlapped,
// The cancelled flag and the result here are manual reference counting. The driver holds the
// strong ref until it completes; the runtime holds the strong ref until the future is
// dropped.
cancelled: bool,
// The metadata in `*mut RawOp<dyn OpCode>`
metadata: usize,
result: PushEntry<Option<Waker>, io::Result<usize>>,
flags: u32,
op: T,
}
#[repr(C)]
union OpCodePtrRepr {
ptr: *mut RawOp<dyn OpCode>,
components: OpCodePtrComponents,
}
#[repr(C)]
#[derive(Clone, Copy)]
struct OpCodePtrComponents {
data_pointer: *mut (),
metadata: usize,
}
fn opcode_metadata<T: OpCode + 'static>() -> usize {
let mut op = MaybeUninit::<RawOp<T>>::uninit();
// SAFETY: same as `core::ptr::metadata`.
unsafe {
OpCodePtrRepr {
ptr: op.as_mut_ptr(),
}
.components
.metadata
}
}
const unsafe fn opcode_dyn_mut(ptr: *mut (), metadata: usize) -> *mut RawOp<dyn OpCode> {
OpCodePtrRepr {
components: OpCodePtrComponents {
metadata,
data_pointer: ptr,
},
}
.ptr
}
/// A typed wrapper for key of Ops submitted into driver. It doesn't free the
/// inner on dropping. Instead, the memory is managed by the proactor. The inner
/// is only freed when:
///
/// 1. The op is completed and the future asks the result. `into_inner` will be
/// called by the proactor.
/// 2. The op is completed and the future cancels it. `into_box` will be called
/// by the proactor.
#[derive(PartialEq, Eq, Hash)]
pub struct Key<T: ?Sized> {
user_data: *mut (),
_p: PhantomData<Box<RawOp<T>>>,
}
impl<T: ?Sized> Unpin for Key<T> {}
impl<T: OpCode + 'static> Key<T> {
/// Create [`RawOp`] and get the [`Key`] to it.
pub(crate) fn new(driver: RawFd, op: T) -> Self {
let header = Overlapped::new(driver);
let raw_op = Box::new(RawOp {
header,
cancelled: false,
metadata: opcode_metadata::<T>(),
result: PushEntry::Pending(None),
flags: 0,
op,
});
unsafe { Self::new_unchecked(Box::into_raw(raw_op) as _) }
}
}
impl<T: ?Sized> Key<T> {
/// Create a new `Key` with the given user data.
///
/// # Safety
///
/// Caller needs to ensure that `T` does correspond to `user_data` in driver
/// this `Key` is created with. In most cases, it is enough to let `T` be
/// `dyn OpCode`.
pub unsafe fn new_unchecked(user_data: usize) -> Self {
Self {
user_data: user_data as _,
_p: PhantomData,
}
}
/// Get the unique user-defined data.
pub fn user_data(&self) -> usize {
self.user_data as _
}
fn as_opaque(&self) -> &RawOp<()> {
// SAFETY: user_data is unique and RawOp is repr(C).
unsafe { &*(self.user_data as *const RawOp<()>) }
}
fn as_opaque_mut(&mut self) -> &mut RawOp<()> {
// SAFETY: see `as_opaque`.
unsafe { &mut *(self.user_data as *mut RawOp<()>) }
}
fn as_dyn_mut_ptr(&mut self) -> *mut RawOp<dyn OpCode> {
let user_data = self.user_data;
let this = self.as_opaque_mut();
// SAFETY: metadata from `Key::new`.
unsafe { opcode_dyn_mut(user_data, this.metadata) }
}
/// A pointer to OVERLAPPED.
#[cfg(windows)]
pub(crate) fn as_mut_ptr(&mut self) -> *mut Overlapped {
&mut self.as_opaque_mut().header
}
/// Cancel the op, decrease the ref count. The return value indicates if the
/// op is completed. If so, the op should be dropped because it is
/// useless.
pub(crate) fn set_cancelled(&mut self) -> bool {
self.as_opaque_mut().cancelled = true;
self.has_result()
}
/// Complete the op, decrease the ref count. Wake the future if a waker is
/// set. The return value indicates if the op is cancelled. If so, the
/// op should be dropped because it is useless.
pub(crate) fn set_result(&mut self, res: io::Result<usize>) -> bool {
let this = unsafe { &mut *self.as_dyn_mut_ptr() };
#[cfg(all(target_os = "linux", feature = "io-uring"))]
if let Ok(res) = res {
unsafe {
Pin::new_unchecked(&mut this.op).set_result(res);
}
}
if let PushEntry::Pending(Some(w)) =
std::mem::replace(&mut this.result, PushEntry::Ready(res))
{
w.wake();
}
this.cancelled
}
/// Whether the op is completed.
pub(crate) fn has_result(&self) -> bool {
self.as_opaque().result.is_ready()
}
/// Set waker of the current future.
pub(crate) fn set_waker(&mut self, waker: Waker) {
if let PushEntry::Pending(w) = &mut self.as_opaque_mut().result {
*w = Some(waker)
}
}
/// Get the inner [`RawOp`]. It is usually used to drop the inner
/// immediately, without knowing about the inner `T`.
///
/// # Safety
///
/// Call it only when the op is cancelled and completed, which is the case
/// when the ref count becomes zero. See doc of [`Key::set_cancelled`]
/// and [`Key::set_result`].
pub(crate) unsafe fn into_box(mut self) -> Box<RawOp<dyn OpCode>> {
Box::from_raw(self.as_dyn_mut_ptr())
}
}
impl<T> Key<T> {
/// Get the inner result if it is completed.
///
/// # Safety
///
/// Call it only when the op is completed, otherwise it is UB.
pub(crate) unsafe fn into_inner(self) -> (io::Result<usize>, T) {
let op = unsafe { Box::from_raw(self.user_data as *mut RawOp<T>) };
(op.result.take_ready().unwrap_unchecked(), op.op)
}
}
impl<T: OpCode + ?Sized> Key<T> {
/// Pin the inner op.
pub(crate) fn as_op_pin(&mut self) -> Pin<&mut dyn OpCode> {
// SAFETY: the inner won't be moved.
unsafe {
let this = &mut *self.as_dyn_mut_ptr();
Pin::new_unchecked(&mut this.op)
}
}
}
impl<T: ?Sized> std::fmt::Debug for Key<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Key({})", self.user_data())
}
}

View file

@ -1,455 +0,0 @@
//! The platform-specified driver.
//! Some types differ by compilation target.
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![allow(clippy::type_complexity)]
// #[cfg(all(
// target_os = "linux",
// not(feature = "io-uring"),
// not(feature = "polling")
// ))]
// compile_error!(
// "You must choose at least one of these features: [\"io-uring\", \"polling\"]"
// );
#[cfg(unix)]
use std::{io, task::Poll, task::Waker, time::Duration};
#[cfg(unix)]
mod key;
#[cfg(unix)]
pub use key::Key;
#[cfg(unix)]
pub mod op;
#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(all())))]
mod unix;
#[cfg(unix)]
use unix::Overlapped;
#[cfg(unix)]
mod asyncify;
#[cfg(unix)]
pub use asyncify::*;
mod driver_type;
pub use driver_type::*;
cfg_if::cfg_if! {
//if #[cfg(windows)] {
// #[path = "iocp/mod.rs"]
// mod sys;
//} else
if #[cfg(all(target_os = "linux", feature = "io-uring"))] {
#[path = "uring/mod.rs"]
mod sys;
} else if #[cfg(unix)] {
#[path = "poll/mod.rs"]
mod sys;
}
}
#[cfg(unix)]
pub use sys::*;
#[cfg(windows)]
#[macro_export]
#[doc(hidden)]
macro_rules! syscall {
(BOOL, $e:expr) => {
$crate::syscall!($e, == 0)
};
(SOCKET, $e:expr) => {
$crate::syscall!($e, != 0)
};
(HANDLE, $e:expr) => {
$crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE)
};
($e:expr, $op: tt $rhs: expr) => {{
#[allow(unused_unsafe)]
let res = unsafe { $e };
if res $op $rhs {
Err(::std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
/// Helper macro to execute a system call
#[cfg(unix)]
#[macro_export]
#[doc(hidden)]
macro_rules! syscall {
(break $e:expr) => {
loop {
match $crate::syscall!($e) {
Ok(fd) => break ::std::task::Poll::Ready(Ok(fd as usize)),
Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS)
=> break ::std::task::Poll::Pending,
Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => {},
Err(e) => break ::std::task::Poll::Ready(Err(e)),
}
}
};
($e:expr, $f:ident($fd:expr)) => {
match $crate::syscall!(break $e) {
::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)),
::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)),
::std::task::Poll::Ready(Err(e)) => Err(e),
}
};
($e:expr) => {{
#[allow(unused_unsafe)]
let res = unsafe { $e };
if res == -1 {
Err(::std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
#[macro_export]
#[doc(hidden)]
macro_rules! impl_raw_fd {
($t:ty, $it:ty, $inner:ident) => {
impl $crate::driver::AsRawFd for $t {
fn as_raw_fd(&self) -> $crate::driver::RawFd {
self.$inner.as_raw_fd()
}
}
#[cfg(unix)]
impl std::os::fd::FromRawFd for $t {
unsafe fn from_raw_fd(fd: $crate::driver::RawFd) -> Self {
Self {
$inner: std::os::fd::FromRawFd::from_raw_fd(fd),
}
}
}
};
($t:ty, $it:ty, $inner:ident,file) => {
$crate::impl_raw_fd!($t, $it, $inner);
#[cfg(windows)]
impl std::os::windows::io::FromRawHandle for $t {
unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
Self {
$inner: std::os::windows::io::FromRawHandle::from_raw_handle(handle),
}
}
}
#[cfg(windows)]
impl std::os::windows::io::AsRawHandle for $t {
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
self.$inner.as_raw_handle()
}
}
};
($t:ty, $it:ty, $inner:ident,socket) => {
$crate::impl_raw_fd!($t, $it, $inner);
#[cfg(windows)]
impl std::os::windows::io::FromRawSocket for $t {
unsafe fn from_raw_socket(sock: std::os::windows::io::RawSocket) -> Self {
Self {
$inner: std::os::windows::io::FromRawSocket::from_raw_socket(sock),
}
}
}
#[cfg(windows)]
impl std::os::windows::io::AsRawSocket for $t {
fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
self.$inner.as_raw_socket()
}
}
};
}
/// The return type of [`Proactor::push`].
pub enum PushEntry<K, R> {
/// The operation is pushed to the submission queue.
Pending(K),
/// The operation is ready and returns.
Ready(R),
}
impl<K, R> PushEntry<K, R> {
/// Get if the current variant is [`PushEntry::Ready`].
pub const fn is_ready(&self) -> bool {
matches!(self, Self::Ready(_))
}
/// Take the ready variant if exists.
pub fn take_ready(self) -> Option<R> {
match self {
Self::Pending(_) => None,
Self::Ready(res) => Some(res),
}
}
/// Map the [`PushEntry::Pending`] branch.
pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
match self {
Self::Pending(k) => PushEntry::Pending(f(k)),
Self::Ready(r) => PushEntry::Ready(r),
}
}
/// Map the [`PushEntry::Ready`] branch.
pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
match self {
Self::Pending(k) => PushEntry::Pending(k),
Self::Ready(r) => PushEntry::Ready(f(r)),
}
}
}
#[cfg(unix)]
/// Low-level actions of completion-based IO.
/// It owns the operations to keep the driver safe.
pub struct Proactor {
driver: Driver,
}
#[cfg(unix)]
impl Proactor {
/// Create [`Proactor`] with 1024 entries.
pub fn new() -> io::Result<Self> {
Self::builder().build()
}
/// Create [`ProactorBuilder`] to config the proactor.
pub fn builder() -> ProactorBuilder {
ProactorBuilder::new()
}
fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
Ok(Self {
driver: Driver::new(builder)?,
})
}
/// Cancel an operation with the pushed user-defined data.
///
/// The cancellation is not reliable. The underlying operation may continue,
/// but just don't return from [`Proactor::poll`]. Therefore, although an
/// operation is cancelled, you should not reuse its `user_data`.
pub fn cancel<T: OpCode>(&self, mut op: Key<T>) -> Option<(io::Result<usize>, T)> {
if op.set_cancelled() {
// SAFETY: completed.
Some(unsafe { op.into_inner() })
} else {
None
}
}
/// Push an operation into the driver, and return the unique key, called
/// user-defined data, associated with it.
pub fn push<T: OpCode + 'static>(
&self,
op: T,
) -> PushEntry<Key<T>, (io::Result<usize>, T)> {
let mut op = self.driver.create_op(op);
match self
.driver
.push(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) })
{
Poll::Pending => PushEntry::Pending(op),
Poll::Ready(res) => {
op.set_result(res);
// SAFETY: just completed.
PushEntry::Ready(unsafe { op.into_inner() })
}
}
}
/// Poll the driver and get completed entries.
/// You need to call [`Proactor::pop`] to get the pushed
/// operations.
pub fn poll<F: FnOnce()>(&self, timeout: Option<Duration>, f: F) -> io::Result<()> {
unsafe { self.driver.poll(timeout, f) }
}
/// Get the pushed operations from the completion entries.
///
/// # Panics
/// This function will panic if the requested operation has not been
/// completed.
pub fn pop<T>(&self, op: Key<T>) -> PushEntry<Key<T>, (io::Result<usize>, T)> {
if op.has_result() {
// SAFETY: completed.
PushEntry::Ready(unsafe { op.into_inner() })
} else {
PushEntry::Pending(op)
}
}
/// Update the waker of the specified op.
pub fn update_waker<T>(&self, op: &mut Key<T>, waker: Waker) {
op.set_waker(waker);
}
/// Create a notify handle to interrupt the inner driver.
pub fn handle(&self) -> NotifyHandle {
self.driver.handle()
}
pub fn register_handler<F>(&self, f: F)
where
F: FnOnce(DriverApi) -> Box<dyn sys::op::Handler>,
{
self.driver.register_handler(f)
}
}
#[cfg(unix)]
impl AsRawFd for Proactor {
fn as_raw_fd(&self) -> RawFd {
self.driver.as_raw_fd()
}
}
/// An completed entry returned from kernel.
#[cfg(unix)]
#[derive(Debug)]
pub(crate) struct Entry {
user_data: usize,
result: io::Result<usize>,
}
#[cfg(unix)]
impl Entry {
pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
Self { user_data, result }
}
/// The user-defined data returned by [`Proactor::push`].
pub fn user_data(&self) -> usize {
self.user_data
}
/// The result of the operation.
pub fn into_result(self) -> io::Result<usize> {
self.result
}
/// SAFETY: `user_data` should be a valid pointer.
pub unsafe fn notify(self) {
let user_data = self.user_data();
let mut op = Key::<()>::new_unchecked(user_data);
if op.set_result(self.into_result()) {
// SAFETY: completed and cancelled.
let _ = op.into_box();
}
}
}
#[cfg(unix)]
#[derive(Debug, Clone)]
enum ThreadPoolBuilder {
Create { limit: usize, recv_limit: Duration },
Reuse(AsyncifyPool),
}
#[cfg(unix)]
impl Default for ThreadPoolBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(unix)]
impl ThreadPoolBuilder {
pub fn new() -> Self {
Self::Create {
limit: 256,
recv_limit: Duration::from_secs(60),
}
}
pub fn create_or_reuse(&self) -> AsyncifyPool {
match self {
Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
Self::Reuse(pool) => pool.clone(),
}
}
}
/// Builder for [`Proactor`].
#[cfg(unix)]
#[derive(Debug, Clone)]
pub struct ProactorBuilder {
capacity: u32,
pool_builder: ThreadPoolBuilder,
}
#[cfg(unix)]
impl Default for ProactorBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(unix)]
impl ProactorBuilder {
/// Create the builder with default config.
pub fn new() -> Self {
Self {
capacity: 1024,
pool_builder: ThreadPoolBuilder::new(),
}
}
/// Set the capacity of the inner event queue or submission queue, if
/// exists. The default value is 1024.
pub fn capacity(&mut self, capacity: u32) -> &mut Self {
self.capacity = capacity;
self
}
/// Set the thread number limit of the inner thread pool, if exists. The
/// default value is 256.
///
/// It will be ignored if `reuse_thread_pool` is set.
pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
*limit = value;
}
self
}
/// Set the waiting timeout of the inner thread, if exists. The default is
/// 60 seconds.
///
/// It will be ignored if `reuse_thread_pool` is set.
pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
*recv_limit = timeout;
}
self
}
/// Set to reuse an existing [`AsyncifyPool`] in this proactor.
pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
self.pool_builder = ThreadPoolBuilder::Reuse(pool);
self
}
/// Force reuse the thread pool for each proactor created by this builder,
/// even `reuse_thread_pool` is not set.
pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
self.reuse_thread_pool(self.create_or_get_thread_pool());
self
}
/// Create or reuse the thread pool from the config.
pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
self.pool_builder.create_or_reuse()
}
/// Build the [`Proactor`].
pub fn build(&self) -> io::Result<Proactor> {
Proactor::with_builder(self)
}
}

View file

@ -1,45 +0,0 @@
//! The async operations.
//!
//! Types in this mod represents the low-level operations passed to kernel.
//! The operation itself doesn't perform anything.
//! You need to pass them to [`crate::Proactor`], and poll the driver.
use std::{marker::PhantomPinned, net::Shutdown};
#[cfg(unix)]
pub use super::sys::op::*;
/// Spawn a blocking function in the thread pool.
pub struct Asyncify<F, D> {
pub(crate) f: Option<F>,
pub(crate) data: Option<D>,
_p: PhantomPinned,
}
impl<F, D> Asyncify<F, D> {
/// Create [`Asyncify`].
pub fn new(f: F) -> Self {
Self {
f: Some(f),
data: None,
_p: PhantomPinned,
}
}
pub fn into_inner(mut self) -> D {
self.data.take().expect("the data should not be None")
}
}
/// Shutdown a socket.
pub struct ShutdownSocket<S> {
pub(crate) fd: S,
pub(crate) how: Shutdown,
}
impl<S> ShutdownSocket<S> {
/// Create [`ShutdownSocket`].
pub fn new(fd: S, how: Shutdown) -> Self {
Self { fd, how }
}
}

View file

@ -1,456 +0,0 @@
#![allow(clippy::type_complexity)]
pub use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::{cell::Cell, cell::RefCell, io, rc::Rc, sync::Arc};
use std::{num::NonZeroUsize, os::fd::BorrowedFd, pin::Pin, task::Poll, time::Duration};
use crossbeam_queue::SegQueue;
use nohash_hasher::IntMap;
use polling::{Event, Events, Poller};
use crate::driver::{op::Interest, sys, AsyncifyPool, Entry, Key, ProactorBuilder};
pub(crate) mod op;
/// Abstraction of operations.
pub trait OpCode {
/// Perform the operation before submit, and return [`Decision`] to
/// indicate whether submitting the operation to polling is required.
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
/// Perform the operation after received corresponding
/// event. If this operation is blocking, the return value should be
/// [`Poll::Ready`].
fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
}
/// Result of [`OpCode::pre_submit`].
#[non_exhaustive]
pub enum Decision {
/// Instant operation, no need to submit
Completed(usize),
/// Blocking operation, needs to be spawned in another thread
Blocking,
}
bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 {
const NEW = 0b0000_0001;
const CHANGED = 0b0000_0010;
}
}
#[derive(Debug)]
struct FdItem {
flags: Flags,
batch: usize,
read: Option<usize>,
write: Option<usize>,
}
impl FdItem {
fn new(batch: usize) -> Self {
Self {
batch,
read: None,
write: None,
flags: Flags::NEW,
}
}
fn register(&mut self, user_data: usize, interest: Interest) {
self.flags.insert(Flags::CHANGED);
match interest {
Interest::Readable => {
self.read = Some(user_data);
}
Interest::Writable => {
self.write = Some(user_data);
}
}
}
fn unregister(&mut self, int: Interest) {
let res = match int {
Interest::Readable => self.read.take(),
Interest::Writable => self.write.take(),
};
if res.is_some() {
self.flags.insert(Flags::CHANGED);
}
}
fn unregister_all(&mut self) {
if self.read.is_some() || self.write.is_some() {
self.flags.insert(Flags::CHANGED);
}
let _ = self.read.take();
let _ = self.write.take();
}
fn user_data(&mut self, interest: Interest) -> Option<usize> {
match interest {
Interest::Readable => self.read,
Interest::Writable => self.write,
}
}
fn event(&self, key: usize) -> Event {
let mut event = Event::none(key);
if self.read.is_some() {
event.readable = true;
}
if self.write.is_some() {
event.writable = true;
}
event
}
}
#[derive(Debug)]
enum Change {
Register {
fd: RawFd,
batch: usize,
user_data: usize,
int: Interest,
},
Unregister {
fd: RawFd,
batch: usize,
int: Interest,
},
UnregisterAll {
fd: RawFd,
batch: usize,
},
Blocking {
user_data: usize,
},
}
pub struct DriverApi {
batch: usize,
changes: Rc<RefCell<Vec<Change>>>,
}
impl DriverApi {
pub fn register(&self, fd: RawFd, user_data: usize, int: Interest) {
log::debug!(
"Register interest {:?} for {:?} user-data: {:?}",
int,
fd,
user_data
);
self.change(Change::Register {
fd,
batch: self.batch,
user_data,
int,
});
}
pub fn unregister(&self, fd: RawFd, int: Interest) {
log::debug!(
"Unregister interest {:?} for {:?} batch: {:?}",
int,
fd,
self.batch
);
self.change(Change::Unregister {
fd,
batch: self.batch,
int,
});
}
pub fn unregister_all(&self, fd: RawFd) {
self.change(Change::UnregisterAll {
fd,
batch: self.batch,
});
}
fn change(&self, change: Change) {
self.changes.borrow_mut().push(change);
}
}
/// Low-level driver of polling.
pub(crate) struct Driver {
poll: Arc<Poller>,
events: RefCell<Events>,
registry: RefCell<IntMap<RawFd, FdItem>>,
pool: AsyncifyPool,
pool_completed: Arc<SegQueue<Entry>>,
hid: Cell<usize>,
changes: Rc<RefCell<Vec<Change>>>,
handlers: Cell<Option<Box<Vec<Box<dyn self::op::Handler>>>>>,
}
impl Driver {
pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
log::trace!("New poll driver");
let entries = builder.capacity as usize; // for the sake of consistency, use u32 like iour
let events = if entries == 0 {
Events::new()
} else {
Events::with_capacity(NonZeroUsize::new(entries).unwrap())
};
Ok(Self {
poll: Arc::new(Poller::new()?),
events: RefCell::new(events),
registry: RefCell::new(Default::default()),
pool: builder.create_or_get_thread_pool(),
pool_completed: Arc::new(SegQueue::new()),
hid: Cell::new(0),
changes: Rc::new(RefCell::new(Vec::with_capacity(16))),
handlers: Cell::new(Some(Box::new(Vec::default()))),
})
}
pub fn register_handler<F>(&self, f: F)
where
F: FnOnce(DriverApi) -> Box<dyn self::op::Handler>,
{
let id = self.hid.get();
let mut handlers = self.handlers.take().unwrap_or_default();
let api = DriverApi {
batch: id,
changes: self.changes.clone(),
};
handlers.push(f(api));
self.hid.set(id + 1);
self.handlers.set(Some(handlers));
}
pub fn create_op<T: sys::OpCode + 'static>(&self, op: T) -> Key<T> {
Key::new(self.as_raw_fd(), op)
}
pub fn push(&self, op: &mut Key<dyn sys::OpCode>) -> Poll<io::Result<usize>> {
let user_data = op.user_data();
let op_pin = op.as_op_pin();
match op_pin.pre_submit()? {
Decision::Completed(res) => Poll::Ready(Ok(res)),
Decision::Blocking => {
self.changes
.borrow_mut()
.push(Change::Blocking { user_data });
Poll::Pending
}
}
}
pub unsafe fn poll<F: FnOnce()>(
&self,
timeout: Option<Duration>,
f: F,
) -> io::Result<()> {
if self.poll_blocking() {
f();
self.apply_changes()?;
return Ok(());
}
let mut events = self.events.borrow_mut();
self.poll.wait(&mut events, timeout)?;
if events.is_empty() {
if timeout.is_some() && timeout != Some(Duration::ZERO) {
return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
}
} else {
// println!("POLL, events: {:?}", events.len());
let mut registry = self.registry.borrow_mut();
let mut handlers = self.handlers.take().unwrap();
for event in events.iter() {
let fd = event.key as RawFd;
log::debug!("Event {:?} for {:?}", event, registry.get(&fd));
if let Some(item) = registry.get_mut(&fd) {
if event.readable {
if let Some(user_data) = item.user_data(Interest::Readable) {
handlers[item.batch].readable(user_data)
}
}
if event.writable {
if let Some(user_data) = item.user_data(Interest::Writable) {
handlers[item.batch].writable(user_data)
}
}
}
}
self.handlers.set(Some(handlers));
}
// apply changes
self.apply_changes()?;
// complete batch handling
let mut handlers = self.handlers.take().unwrap();
for handler in handlers.iter_mut() {
handler.commit();
}
self.handlers.set(Some(handlers));
self.apply_changes()?;
// run user function
f();
// check if we have more changes from "run"
self.apply_changes()?;
Ok(())
}
/// re-calc driver changes
unsafe fn apply_changes(&self) -> io::Result<()> {
let mut changes = self.changes.borrow_mut();
if changes.is_empty() {
return Ok(());
}
log::debug!("Apply driver changes, {:?}", changes.len());
let mut registry = self.registry.borrow_mut();
for change in &mut *changes {
match change {
Change::Register {
fd,
batch,
user_data,
int,
} => {
let item = registry.entry(*fd).or_insert_with(|| FdItem::new(*batch));
item.register(*user_data, *int);
}
Change::Unregister { fd, batch, int } => {
let item = registry.entry(*fd).or_insert_with(|| FdItem::new(*batch));
item.unregister(*int);
}
Change::UnregisterAll { fd, batch } => {
let item = registry.entry(*fd).or_insert_with(|| FdItem::new(*batch));
item.unregister_all();
}
_ => {}
}
}
for change in changes.drain(..) {
let fd = match change {
Change::Register { fd, .. } => Some(fd),
Change::Unregister { fd, .. } => Some(fd),
Change::UnregisterAll { fd, .. } => Some(fd),
Change::Blocking { user_data } => {
self.push_blocking(user_data);
None
}
};
if let Some(fd) = fd {
if let Some(item) = registry.get_mut(&fd) {
if item.flags.contains(Flags::CHANGED) {
item.flags.remove(Flags::CHANGED);
let new = item.flags.contains(Flags::NEW);
let renew_event = item.event(fd as usize);
if !renew_event.readable && !renew_event.writable {
// crate::log(format!("DELETE - {:?}", fd.as_raw_fd()));
registry.remove(&fd);
if !new {
self.poll.delete(BorrowedFd::borrow_raw(fd))?;
}
} else if new {
item.flags.remove(Flags::NEW);
// crate::log(format!("ADD - {:?}", fd.as_raw_fd()));
unsafe { self.poll.add(fd, renew_event)? };
} else {
// crate::log(format!("MODIFY - {:?} {:?}", fd.as_raw_fd(), renew_event));
self.poll.modify(BorrowedFd::borrow_raw(fd), renew_event)?;
}
}
}
}
}
Ok(())
}
fn push_blocking(&self, user_data: usize) {
let poll = self.poll.clone();
let completed = self.pool_completed.clone();
let mut closure = move || {
let mut op = unsafe { Key::<dyn sys::OpCode>::new_unchecked(user_data) };
let op_pin = op.as_op_pin();
let res = match op_pin.operate() {
Poll::Pending => unreachable!("this operation is not non-blocking"),
Poll::Ready(res) => res,
};
completed.push(Entry::new(user_data, res));
poll.notify().ok();
};
while let Err(e) = self.pool.dispatch(closure) {
closure = e.0;
self.poll_blocking();
}
}
fn poll_blocking(&self) -> bool {
if self.pool_completed.is_empty() {
false
} else {
while let Some(entry) = self.pool_completed.pop() {
unsafe {
entry.notify();
}
}
true
}
}
/// Get notification handle
pub fn handle(&self) -> NotifyHandle {
NotifyHandle::new(self.poll.clone())
}
}
impl AsRawFd for Driver {
fn as_raw_fd(&self) -> RawFd {
self.poll.as_raw_fd()
}
}
impl Drop for Driver {
fn drop(&mut self) {
for fd in self.registry.borrow().keys() {
unsafe {
let fd = BorrowedFd::borrow_raw(*fd);
self.poll.delete(fd).ok();
}
}
}
}
#[derive(Clone)]
/// A notify handle to the inner driver.
pub struct NotifyHandle {
poll: Arc<Poller>,
}
impl NotifyHandle {
fn new(poll: Arc<Poller>) -> Self {
Self { poll }
}
/// Notify the driver
pub fn notify(&self) -> io::Result<()> {
self.poll.notify()
}
}

View file

@ -1,96 +0,0 @@
use std::{io, marker::Send, mem, os::fd::FromRawFd, os::fd::RawFd, pin::Pin, task::Poll};
pub use crate::driver::unix::op::*;
use super::{AsRawFd, Decision, OpCode, OwnedFd};
use crate::{driver::op::*, syscall};
pub trait Handler {
/// Submitted interest
fn readable(&mut self, id: usize);
/// Submitted interest
fn writable(&mut self, id: usize);
/// Operation submission has failed
fn error(&mut self, id: usize, err: io::Error);
/// All events are processed, process all updates
fn commit(&mut self);
}
impl<D, F> OpCode for Asyncify<F, D>
where
D: Send + 'static,
F: (FnOnce() -> (io::Result<usize>, D)) + Send + 'static,
{
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::Blocking)
}
fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
// Safety: self won't be moved
let this = unsafe { self.get_unchecked_mut() };
let f = this
.f
.take()
.expect("the operate method could only be called once");
let (res, data) = f();
this.data = Some(data);
Poll::Ready(res)
}
}
/// Close socket fd.
pub struct CloseSocket {
pub(crate) fd: mem::ManuallyDrop<OwnedFd>,
}
impl CloseSocket {
/// Create [`CloseSocket`].
pub fn new(fd: OwnedFd) -> Self {
Self {
fd: mem::ManuallyDrop::new(fd),
}
}
}
impl OpCode for CreateSocket {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::Blocking)
}
fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(
syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _,
))
}
}
impl<S: AsRawFd> OpCode for ShutdownSocket<S> {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::Blocking)
}
fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(
syscall!(libc::shutdown(self.fd.as_raw_fd(), self.how()))? as _,
))
}
}
impl CloseSocket {
pub fn from_raw_fd(fd: RawFd) -> Self {
Self::new(unsafe { FromRawFd::from_raw_fd(fd) })
}
}
impl OpCode for CloseSocket {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::Blocking)
}
fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(syscall!(libc::close(self.fd.as_raw_fd()))? as _))
}
}

View file

@ -1,15 +0,0 @@
//! This mod doesn't actually contain any driver, but meant to provide some
//! common op type and utilities for unix platform (for iour and polling).
pub(crate) mod op;
use crate::driver::RawFd;
/// The overlapped struct for unix needn't contain extra fields.
pub(crate) struct Overlapped;
impl Overlapped {
pub fn new(_driver: RawFd) -> Self {
Self
}
}

View file

@ -1,40 +0,0 @@
use std::net::Shutdown;
use crate::driver::op::*;
/// The interest to poll a file descriptor.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Interest {
/// Represents a read operation.
Readable,
/// Represents a write operation.
Writable,
}
/// Create a socket.
pub struct CreateSocket {
pub(crate) domain: i32,
pub(crate) socket_type: i32,
pub(crate) protocol: i32,
}
impl CreateSocket {
/// Create [`CreateSocket`].
pub fn new(domain: i32, socket_type: i32, protocol: i32) -> Self {
Self {
domain,
socket_type,
protocol,
}
}
}
impl<S> ShutdownSocket<S> {
pub(crate) fn how(&self) -> i32 {
match self.how {
Shutdown::Write => libc::SHUT_WR,
Shutdown::Read => libc::SHUT_RD,
Shutdown::Both => libc::SHUT_RDWR,
}
}
}

View file

@ -1,340 +0,0 @@
pub use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::cell::{Cell, RefCell};
use std::{
collections::VecDeque, io, pin::Pin, rc::Rc, sync::Arc, task::Poll, time::Duration,
};
use crossbeam_queue::SegQueue;
use io_uring::cqueue::{more, Entry as CEntry};
use io_uring::opcode::{AsyncCancel, PollAdd};
use io_uring::squeue::Entry as SEntry;
use io_uring::types::{Fd, SubmitArgs, Timespec};
use io_uring::IoUring;
mod notify;
pub(crate) mod op;
pub use self::notify::NotifyHandle;
use self::notify::Notifier;
use crate::driver::{sys, AsyncifyPool, Entry, Key, ProactorBuilder};
/// Abstraction of io-uring operations.
pub trait OpCode {
/// Name of the operation
fn name(&self) -> &'static str;
/// Call the operation in a blocking way. This method will only be called if
/// [`create_entry`] returns [`OpEntry::Blocking`].
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
unreachable!("this operation is asynchronous")
}
/// Set the result when it successfully completes.
/// The operation stores the result and is responsible to release it if the
/// operation is cancelled.
///
/// # Safety
///
/// Users should not call it.
unsafe fn set_result(self: Pin<&mut Self>, _: usize) {}
}
#[derive(Debug)]
enum Change {
Submit { entry: SEntry },
Cancel { op_id: u64 },
}
pub struct DriverApi {
batch: u64,
changes: Rc<RefCell<VecDeque<Change>>>,
}
impl DriverApi {
pub fn submit(&self, user_data: u32, entry: SEntry) {
log::debug!(
"Submit operation batch: {:?} user-data: {:?} entry: {:?}",
self.batch >> Driver::BATCH,
user_data,
entry,
);
self.changes.borrow_mut().push_back(Change::Submit {
entry: entry.user_data(user_data as u64 | self.batch),
});
}
pub fn cancel(&self, op_id: u32) {
log::debug!(
"Cancel operation batch: {:?} user-data: {:?}",
self.batch >> Driver::BATCH,
op_id
);
self.changes.borrow_mut().push_back(Change::Cancel {
op_id: op_id as u64 | self.batch,
});
}
}
/// Low-level driver of io-uring.
pub(crate) struct Driver {
ring: RefCell<IoUring<SEntry, CEntry>>,
notifier: Notifier,
pool: AsyncifyPool,
pool_completed: Arc<SegQueue<Entry>>,
hid: Cell<u64>,
changes: Rc<RefCell<VecDeque<Change>>>,
handlers: Cell<Option<Box<Vec<Box<dyn op::Handler>>>>>,
}
impl Driver {
const NOTIFY: u64 = u64::MAX;
const CANCEL: u64 = u64::MAX - 1;
const BATCH: u64 = 48;
const BATCH_MASK: u64 = 0xFFFF_0000_0000_0000;
const DATA_MASK: u64 = 0x0000_FFFF_FFFF_FFFF;
pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
log::trace!("New io-uring driver");
let mut ring = IoUring::builder()
.setup_coop_taskrun()
.setup_single_issuer()
.build(builder.capacity)?;
let notifier = Notifier::new()?;
#[allow(clippy::useless_conversion)]
unsafe {
ring.submission()
.push(
&PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _)
.multi(true)
.build()
.user_data(Self::NOTIFY)
.into(),
)
.expect("the squeue sould not be full");
}
Ok(Self {
notifier,
ring: RefCell::new(ring),
pool: builder.create_or_get_thread_pool(),
pool_completed: Arc::new(SegQueue::new()),
hid: Cell::new(0),
changes: Rc::new(RefCell::new(VecDeque::new())),
handlers: Cell::new(Some(Box::new(Vec::new()))),
})
}
pub fn register_handler<F>(&self, f: F)
where
F: FnOnce(DriverApi) -> Box<dyn self::op::Handler>,
{
let id = self.hid.get();
let mut handlers = self.handlers.take().unwrap_or_default();
let api = DriverApi {
batch: id << 48,
changes: self.changes.clone(),
};
handlers.push(f(api));
self.hid.set(id + 1);
self.handlers.set(Some(handlers));
}
// Auto means that it choose to wait or not automatically.
fn submit_auto(&self, timeout: Option<Duration>) -> io::Result<()> {
let mut ring = self.ring.borrow_mut();
let res = {
// Last part of submission queue, wait till timeout.
if let Some(duration) = timeout {
let timespec = timespec(duration);
let args = SubmitArgs::new().timespec(&timespec);
ring.submitter().submit_with_args(1, &args)
} else {
ring.submit_and_wait(1)
}
};
match res {
Ok(_) => {
// log::debug!("Submit result: {res:?} {:?}", timeout);
if ring.completion().is_empty() {
Err(io::ErrorKind::TimedOut.into())
} else {
Ok(())
}
}
Err(e) => match e.raw_os_error() {
Some(libc::ETIME) => {
if timeout.is_some() && timeout != Some(Duration::ZERO) {
Err(io::ErrorKind::TimedOut.into())
} else {
Ok(())
}
}
Some(libc::EBUSY) | Some(libc::EAGAIN) => {
Err(io::ErrorKind::Interrupted.into())
}
_ => Err(e),
},
}
}
pub fn create_op<T: sys::OpCode + 'static>(&self, op: T) -> Key<T> {
Key::new(self.as_raw_fd(), op)
}
fn apply_changes(&self) -> bool {
let mut changes = self.changes.borrow_mut();
if changes.is_empty() {
return false;
}
log::debug!("Apply changes, {:?}", changes.len());
let mut ring = self.ring.borrow_mut();
let mut squeue = ring.submission();
while let Some(change) = changes.pop_front() {
match change {
Change::Submit { entry } => {
if unsafe { squeue.push(&entry) }.is_err() {
changes.push_front(Change::Submit { entry });
break;
}
}
Change::Cancel { op_id } => {
let entry = AsyncCancel::new(op_id).build().user_data(Self::CANCEL);
if unsafe { squeue.push(&entry) }.is_err() {
changes.push_front(Change::Cancel { op_id });
break;
}
}
}
}
squeue.sync();
!changes.is_empty()
}
pub unsafe fn poll<F: FnOnce()>(
&self,
timeout: Option<Duration>,
f: F,
) -> io::Result<()> {
self.poll_blocking();
let has_more = self.apply_changes();
let poll_result = self.poll_completions();
if !poll_result || has_more {
if has_more {
self.submit_auto(Some(Duration::ZERO))?;
} else {
self.submit_auto(timeout)?;
}
self.poll_completions();
}
f();
Ok(())
}
pub fn push(&self, op: &mut Key<dyn sys::OpCode>) -> Poll<io::Result<usize>> {
log::trace!("Push op: {:?}", op.as_op_pin().name());
let user_data = op.user_data();
loop {
if self.push_blocking(user_data) {
break Poll::Pending;
} else {
self.poll_blocking();
}
}
}
fn poll_completions(&self) -> bool {
let mut ring = self.ring.borrow_mut();
let mut cqueue = ring.completion();
cqueue.sync();
let has_entry = !cqueue.is_empty();
if !has_entry {
return false;
}
let mut handlers = self.handlers.take().unwrap();
for entry in cqueue {
let user_data = entry.user_data();
match user_data {
Self::CANCEL => {}
Self::NOTIFY => {
let flags = entry.flags();
debug_assert!(more(flags));
self.notifier.clear().expect("cannot clear notifier");
}
_ => {
let batch = ((user_data & Self::BATCH_MASK) >> Self::BATCH) as usize;
let user_data = (user_data & Self::DATA_MASK) as usize;
let result = entry.result();
if result == -libc::ECANCELED {
handlers[batch].canceled(user_data);
} else {
let result = if result < 0 {
Err(io::Error::from_raw_os_error(result))
} else {
Ok(result as _)
};
handlers[batch].completed(user_data, entry.flags(), result);
}
}
}
}
self.handlers.set(Some(handlers));
true
}
fn poll_blocking(&self) {
if !self.pool_completed.is_empty() {
while let Some(entry) = self.pool_completed.pop() {
unsafe {
entry.notify();
}
}
}
}
fn push_blocking(&self, user_data: usize) -> bool {
let handle = self.handle();
let completed = self.pool_completed.clone();
self.pool
.dispatch(move || {
let mut op = unsafe { Key::<dyn sys::OpCode>::new_unchecked(user_data) };
let op_pin = op.as_op_pin();
let res = op_pin.call_blocking();
completed.push(Entry::new(user_data, res));
handle.notify().ok();
})
.is_ok()
}
pub fn handle(&self) -> NotifyHandle {
self.notifier.handle()
}
}
impl AsRawFd for Driver {
fn as_raw_fd(&self) -> RawFd {
self.ring.borrow().as_raw_fd()
}
}
fn timespec(duration: std::time::Duration) -> Timespec {
Timespec::new()
.sec(duration.as_secs())
.nsec(duration.subsec_nanos())
}

View file

@ -1,73 +0,0 @@
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
use std::{io, mem, sync::Arc};
use crate::syscall;
#[derive(Debug)]
pub(crate) struct Notifier {
fd: Arc<OwnedFd>,
}
impl Notifier {
/// Create a new notifier.
pub(crate) fn new() -> io::Result<Self> {
let fd = syscall!(libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
let fd = unsafe { OwnedFd::from_raw_fd(fd) };
Ok(Self { fd: Arc::new(fd) })
}
pub(crate) fn clear(&self) -> io::Result<()> {
loop {
let mut buffer = [0u64];
let res = syscall!(libc::read(
self.fd.as_raw_fd(),
buffer.as_mut_ptr().cast(),
mem::size_of::<u64>()
));
match res {
Ok(len) => {
debug_assert_eq!(len, mem::size_of::<u64>() as _);
break Ok(());
}
// Clear the next time:)
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(()),
// Just like read_exact
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => break Err(e),
}
}
}
pub(crate) fn handle(&self) -> NotifyHandle {
NotifyHandle::new(self.fd.clone())
}
}
impl AsRawFd for Notifier {
fn as_raw_fd(&self) -> RawFd {
self.fd.as_raw_fd()
}
}
#[derive(Clone)]
/// A notify handle to the inner driver.
pub struct NotifyHandle {
fd: Arc<OwnedFd>,
}
impl NotifyHandle {
pub(crate) fn new(fd: Arc<OwnedFd>) -> Self {
Self { fd }
}
/// Notify the inner driver.
pub fn notify(&self) -> io::Result<()> {
let data = 1u64;
syscall!(libc::write(
self.fd.as_raw_fd(),
&data as *const _ as *const _,
std::mem::size_of::<u64>(),
))?;
Ok(())
}
}

View file

@ -1,55 +0,0 @@
use std::{io, os::fd::AsRawFd, pin::Pin};
pub use crate::driver::unix::op::*;
use super::OpCode;
use crate::{driver::op::*, syscall};
pub trait Handler {
/// Operation is completed
fn completed(&mut self, user_data: usize, flags: u32, result: io::Result<i32>);
fn canceled(&mut self, user_data: usize);
}
impl<D, F> OpCode for Asyncify<F, D>
where
D: Send + 'static,
F: (FnOnce() -> (io::Result<usize>, D)) + Send + 'static,
{
fn name(&self) -> &'static str {
"Asyncify"
}
fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
// Safety: self won't be moved
let this = unsafe { self.get_unchecked_mut() };
let f = this
.f
.take()
.expect("the operate method could only be called once");
let (res, data) = f();
this.data = Some(data);
res
}
}
impl OpCode for CreateSocket {
fn name(&self) -> &'static str {
"CreateSocket"
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
Ok(syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _)
}
}
impl<S: AsRawFd> OpCode for ShutdownSocket<S> {
fn name(&self) -> &'static str {
"ShutdownSocket"
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
Ok(syscall!(libc::shutdown(self.fd.as_raw_fd(), self.how()))? as _)
}
}

View file

@ -1,14 +0,0 @@
//! The async runtime for ntex.
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
// #![warn(missing_docs)]
pub mod driver;
pub mod net;
mod op;
mod rt;
pub use async_task::Task;
pub use rt::{
spawn, spawn_blocking, submit, submit_with_flags, JoinHandle, Runtime, RuntimeBuilder,
};

View file

@ -1,11 +0,0 @@
//! Network related.
//!
//! Currently, TCP/UDP/Unix socket are implemented.
mod socket;
mod tcp;
mod unix;
pub use socket::*;
pub use tcp::*;
pub use unix::*;

View file

@ -1,207 +0,0 @@
#![allow(clippy::missing_safety_doc)]
use std::{io, mem::MaybeUninit};
use socket2::{Domain, Protocol, SockAddr, Socket as Socket2, Type};
use crate::{driver::AsRawFd, impl_raw_fd, syscall};
#[derive(Debug)]
pub struct Socket {
socket: Socket2,
}
impl Socket {
pub fn from_socket2(socket: Socket2) -> io::Result<Self> {
#[cfg(unix)]
{
#[cfg(not(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "hurd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
target_os = "espidf",
target_os = "vita",
)))]
socket.set_cloexec(true)?;
#[cfg(any(
target_os = "ios",
target_os = "macos",
target_os = "tvos",
target_os = "watchos",
))]
socket.set_nosigpipe(true)?;
// On Linux we use blocking socket
// Newer kernels have the patch that allows to arm io_uring poll mechanism for
// non blocking socket when there is no connections in listen queue
//
// https://patchwork.kernel.org/project/linux-block/patch/f999615b-205c-49b7-b272-c4e42e45e09d@kernel.dk/#22949861
if cfg!(not(all(target_os = "linux", feature = "io-uring")))
|| crate::driver::DriverType::is_polling()
{
socket.set_nonblocking(true)?;
}
}
Ok(Self { socket })
}
pub fn peer_addr(&self) -> io::Result<SockAddr> {
self.socket.peer_addr()
}
pub fn local_addr(&self) -> io::Result<SockAddr> {
self.socket.local_addr()
}
#[cfg(windows)]
pub async fn new(
domain: Domain,
ty: Type,
protocol: Option<Protocol>,
) -> io::Result<Self> {
use std::panic::resume_unwind;
let socket = crate::spawn_blocking(move || Socket2::new(domain, ty, protocol))
.await
.unwrap_or_else(|e| resume_unwind(e))?;
Self::from_socket2(socket)
}
#[cfg(unix)]
pub async fn new(
domain: Domain,
ty: Type,
protocol: Option<Protocol>,
) -> io::Result<Self> {
use std::os::fd::FromRawFd;
#[allow(unused_mut)]
let mut ty: i32 = ty.into();
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "hurd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
))]
{
ty |= libc::SOCK_CLOEXEC;
}
let op = crate::driver::op::CreateSocket::new(
domain.into(),
ty,
protocol.map(|p| p.into()).unwrap_or_default(),
);
let (res, _) = crate::submit(op).await;
let socket = unsafe { Socket2::from_raw_fd(res? as _) };
Self::from_socket2(socket)
}
pub async fn bind(
addr: &SockAddr,
ty: Type,
protocol: Option<Protocol>,
) -> io::Result<Self> {
let socket = Self::new(addr.domain(), ty, protocol).await?;
socket.socket.bind(addr)?;
Ok(socket)
}
#[cfg(unix)]
pub unsafe fn get_socket_option<T: Copy>(
&self,
level: i32,
name: i32,
) -> io::Result<T> {
let mut value: MaybeUninit<T> = MaybeUninit::uninit();
let mut len = size_of::<T>() as libc::socklen_t;
syscall!(libc::getsockopt(
self.socket.as_raw_fd(),
level,
name,
value.as_mut_ptr() as _,
&mut len
))
.map(|_| {
debug_assert_eq!(len as usize, size_of::<T>());
// SAFETY: The value is initialized by `getsockopt`.
value.assume_init()
})
}
#[cfg(windows)]
pub unsafe fn get_socket_option<T: Copy>(
&self,
level: i32,
name: i32,
) -> io::Result<T> {
let mut value: MaybeUninit<T> = MaybeUninit::uninit();
let mut len = size_of::<T>() as i32;
syscall!(
SOCKET,
windows_sys::Win32::Networking::WinSock::getsockopt(
self.socket.as_raw_fd() as _,
level,
name,
value.as_mut_ptr() as _,
&mut len
)
)
.map(|_| {
debug_assert_eq!(len as usize, size_of::<T>());
// SAFETY: The value is initialized by `getsockopt`.
value.assume_init()
})
}
#[cfg(unix)]
pub unsafe fn set_socket_option<T: Copy>(
&self,
level: i32,
name: i32,
value: &T,
) -> io::Result<()> {
syscall!(libc::setsockopt(
self.socket.as_raw_fd(),
level,
name,
value as *const _ as _,
std::mem::size_of::<T>() as _
))
.map(|_| ())
}
#[cfg(windows)]
pub unsafe fn set_socket_option<T: Copy>(
&self,
level: i32,
name: i32,
value: &T,
) -> io::Result<()> {
syscall!(
SOCKET,
windows_sys::Win32::Networking::WinSock::setsockopt(
self.socket.as_raw_fd() as _,
level,
name,
value as *const _ as _,
std::mem::size_of::<T>() as _
)
)
.map(|_| ())
}
}
impl_raw_fd!(Socket, Socket2, socket, socket);

View file

@ -1,44 +0,0 @@
use std::{io, net::SocketAddr};
use socket2::Socket as Socket2;
use crate::{impl_raw_fd, net::Socket};
/// A TCP stream between a local and a remote socket.
///
/// A TCP stream can either be created by connecting to an endpoint, via the
/// `connect` method, or by accepting a connection from a listener.
#[derive(Debug)]
pub struct TcpStream {
inner: Socket,
}
impl TcpStream {
/// Creates new TcpStream from a std::net::TcpStream.
pub fn from_std(stream: std::net::TcpStream) -> io::Result<Self> {
Ok(Self {
inner: Socket::from_socket2(Socket2::from(stream))?,
})
}
/// Creates new TcpStream from a std::net::TcpStream.
pub fn from_socket(inner: Socket) -> Self {
Self { inner }
}
/// Returns the socket address of the remote peer of this TCP connection.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner
.peer_addr()
.map(|addr| addr.as_socket().expect("should be SocketAddr"))
}
/// Returns the socket address of the local half of this TCP connection.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner
.local_addr()
.map(|addr| addr.as_socket().expect("should be SocketAddr"))
}
}
impl_raw_fd!(TcpStream, socket2::Socket, inner, socket);

View file

@ -1,91 +0,0 @@
use std::io;
use socket2::{SockAddr, Socket as Socket2};
use crate::{impl_raw_fd, net::Socket};
/// A Unix stream between two local sockets on Windows & WSL.
///
/// A Unix stream can either be created by connecting to an endpoint, via the
/// `connect` method.
#[derive(Debug)]
pub struct UnixStream {
inner: Socket,
}
impl UnixStream {
#[cfg(unix)]
/// Creates new UnixStream from a std::os::unix::net::UnixStream.
pub fn from_std(stream: std::os::unix::net::UnixStream) -> io::Result<Self> {
Ok(Self {
inner: Socket::from_socket2(Socket2::from(stream))?,
})
}
/// Creates new TcpStream from a Socket.
pub fn from_socket(inner: Socket) -> Self {
Self { inner }
}
/// Returns the socket path of the remote peer of this connection.
pub fn peer_addr(&self) -> io::Result<SockAddr> {
#[allow(unused_mut)]
let mut addr = self.inner.peer_addr()?;
#[cfg(windows)]
{
fix_unix_socket_length(&mut addr);
}
Ok(addr)
}
/// Returns the socket path of the local half of this connection.
pub fn local_addr(&self) -> io::Result<SockAddr> {
self.inner.local_addr()
}
}
impl_raw_fd!(UnixStream, socket2::Socket, inner, socket);
#[cfg(windows)]
#[inline]
fn empty_unix_socket() -> SockAddr {
use windows_sys::Win32::Networking::WinSock::{AF_UNIX, SOCKADDR_UN};
// SAFETY: the length is correct
unsafe {
SockAddr::try_init(|addr, len| {
let addr: *mut SOCKADDR_UN = addr.cast();
std::ptr::write(
addr,
SOCKADDR_UN {
sun_family: AF_UNIX,
sun_path: [0; 108],
},
);
std::ptr::write(len, 3);
Ok(())
})
}
// it is always Ok
.unwrap()
.1
}
// The peer addr returned after ConnectEx is buggy. It contains bytes that
// should not belong to the address. Luckily a unix path should not contain `\0`
// until the end. We can determine the path ending by that.
#[cfg(windows)]
#[inline]
fn fix_unix_socket_length(addr: &mut SockAddr) {
use windows_sys::Win32::Networking::WinSock::SOCKADDR_UN;
// SAFETY: cannot construct non-unix socket address in safe way.
let unix_addr: &SOCKADDR_UN = unsafe { &*addr.as_ptr().cast() };
let addr_len = match std::ffi::CStr::from_bytes_until_nul(&unix_addr.sun_path) {
Ok(str) => str.to_bytes_with_nul().len() + 2,
Err(_) => std::mem::size_of::<SOCKADDR_UN>(),
};
unsafe {
addr.set_length(addr_len as _);
}
}

View file

@ -1,38 +0,0 @@
use std::{future::Future, io, pin::Pin, task::Context, task::Poll};
use crate::driver::{Key, OpCode, PushEntry};
use crate::rt::Runtime;
#[derive(Debug)]
pub(crate) struct OpFuture<T: OpCode> {
key: Option<Key<T>>,
}
impl<T: OpCode> OpFuture<T> {
pub(crate) fn new(key: Key<T>) -> Self {
Self { key: Some(key) }
}
}
impl<T: OpCode> Future for OpFuture<T> {
type Output = (io::Result<usize>, T);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = Runtime::with_current(|r| r.poll_task(cx, self.key.take().unwrap()));
match res {
PushEntry::Pending(key) => {
self.key = Some(key);
Poll::Pending
}
PushEntry::Ready(res) => Poll::Ready(res),
}
}
}
impl<T: OpCode> Drop for OpFuture<T> {
fn drop(&mut self) {
if let Some(key) = self.key.take() {
Runtime::with_current(|r| r.cancel_op(key));
}
}
}

View file

@ -1,418 +0,0 @@
#![allow(clippy::type_complexity)]
use std::any::{Any, TypeId};
use std::collections::{HashMap, VecDeque};
use std::{
cell::Cell, cell::RefCell, future::Future, io, sync::Arc, task::Context, thread,
time::Duration,
};
use async_task::{Runnable, Task};
use crossbeam_queue::SegQueue;
use crate::driver::{
op::Asyncify, AsRawFd, Key, NotifyHandle, OpCode, Proactor, ProactorBuilder, PushEntry,
RawFd,
};
use crate::op::OpFuture;
scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
/// Type alias for `Task<Result<T, Box<dyn Any + Send>>>`, which resolves to an
/// `Err` when the spawned future panicked.
pub type JoinHandle<T> = Task<Result<T, Box<dyn Any + Send>>>;
pub struct RemoteHandle {
handle: NotifyHandle,
runnables: Arc<RunnableQueue>,
}
impl RemoteHandle {
/// Wake up runtime
pub fn notify(&self) {
self.handle.notify().ok();
}
/// Spawns a new asynchronous task, returning a [`Task`] for it.
///
/// Spawning a task enables the task to execute concurrently to other tasks.
/// There is no guarantee that a spawned task will execute to completion.
pub fn spawn<F: Future + Send + 'static>(&self, future: F) -> Task<F::Output> {
let runnables = self.runnables.clone();
let handle = self.handle.clone();
let schedule = move |runnable| {
runnables.schedule(runnable, &handle);
};
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
runnable.schedule();
task
}
}
/// The async runtime for ntex. It is a thread local runtime, and cannot be
/// sent to other threads.
pub struct Runtime {
driver: Proactor,
runnables: Arc<RunnableQueue>,
event_interval: usize,
data: RefCell<HashMap<TypeId, Box<dyn Any>, fxhash::FxBuildHasher>>,
}
impl Runtime {
/// Create [`Runtime`] with default config.
pub fn new() -> io::Result<Self> {
Self::builder().build()
}
/// Create a builder for [`Runtime`].
pub fn builder() -> RuntimeBuilder {
RuntimeBuilder::new()
}
#[allow(clippy::arc_with_non_send_sync)]
fn with_builder(builder: &RuntimeBuilder) -> io::Result<Self> {
Ok(Self {
driver: builder.proactor_builder.build()?,
runnables: Arc::new(RunnableQueue::new()),
event_interval: builder.event_interval,
data: RefCell::new(HashMap::default()),
})
}
/// Perform a function on the current runtime.
///
/// ## Panics
///
/// This method will panic if there are no running [`Runtime`].
pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
#[cold]
fn not_in_neon_runtime() -> ! {
panic!("not in a neon runtime")
}
if CURRENT_RUNTIME.is_set() {
CURRENT_RUNTIME.with(f)
} else {
not_in_neon_runtime()
}
}
/// Get current driver
pub fn driver(&self) -> &Proactor {
&self.driver
}
/// Get handle for current runtime
pub fn handle(&self) -> RemoteHandle {
RemoteHandle {
handle: self.driver.handle(),
runnables: self.runnables.clone(),
}
}
/// Block on the future till it completes.
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
CURRENT_RUNTIME.set(self, || {
let mut result = None;
unsafe { self.spawn_unchecked(async { result = Some(future.await) }) }.detach();
self.runnables.run(self.event_interval);
loop {
if let Some(result) = result.take() {
return result;
}
self.poll_with_driver(self.runnables.has_tasks(), || {
self.runnables.run(self.event_interval);
});
}
})
}
/// Spawns a new asynchronous task, returning a [`Task`] for it.
///
/// Spawning a task enables the task to execute concurrently to other tasks.
/// There is no guarantee that a spawned task will execute to completion.
pub fn spawn<F: Future + 'static>(&self, future: F) -> Task<F::Output> {
unsafe { self.spawn_unchecked(future) }
}
/// Spawns a new asynchronous task, returning a [`Task`] for it.
///
/// # Safety
///
/// The caller should ensure the captured lifetime is long enough.
pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> Task<F::Output> {
let runnables = self.runnables.clone();
let handle = self.driver.handle();
let schedule = move |runnable| {
runnables.schedule(runnable, &handle);
};
let (runnable, task) = async_task::spawn_unchecked(future, schedule);
runnable.schedule();
task
}
/// Spawns a blocking task in a new thread, and wait for it.
///
/// The task will not be cancelled even if the future is dropped.
pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let op = Asyncify::new(move || {
let res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
(Ok(0), res)
});
// It is safe to use `submit` here because the task is spawned immediately.
unsafe {
let fut = self.submit_with_flags(op);
self.spawn_unchecked(async move { fut.await.1.into_inner() })
}
}
fn submit_raw<T: OpCode + 'static>(
&self,
op: T,
) -> PushEntry<Key<T>, (io::Result<usize>, T)> {
self.driver.push(op)
}
fn submit_with_flags<T: OpCode + 'static>(
&self,
op: T,
) -> impl Future<Output = (io::Result<usize>, T)> {
let fut = self.submit_raw(op);
async move {
match fut {
PushEntry::Pending(user_data) => OpFuture::new(user_data).await,
PushEntry::Ready(res) => res,
}
}
}
pub(crate) fn cancel_op<T: OpCode>(&self, op: Key<T>) {
self.driver.cancel(op);
}
pub(crate) fn poll_task<T: OpCode>(
&self,
cx: &mut Context,
op: Key<T>,
) -> PushEntry<Key<T>, (io::Result<usize>, T)> {
self.driver.pop(op).map_pending(|mut k| {
self.driver.update_waker(&mut k, cx.waker().clone());
k
})
}
fn poll_with_driver<F: FnOnce()>(&self, has_tasks: bool, f: F) {
let timeout = if has_tasks {
Some(Duration::ZERO)
} else {
None
};
if let Err(e) = self.driver.poll(timeout, f) {
match e.kind() {
io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
log::debug!("expected error: {e}");
}
_ => panic!("{e:?}"),
}
}
}
/// Insert a type into this runtime.
pub fn insert<T: 'static>(&self, val: T) {
self.data
.borrow_mut()
.insert(TypeId::of::<T>(), Box::new(val));
}
/// Check if container contains entry
pub fn contains<T: 'static>(&self) -> bool {
self.data.borrow().contains_key(&TypeId::of::<T>())
}
/// Get a reference to a type previously inserted on this runtime.
pub fn get<T>(&self) -> Option<T>
where
T: Clone + 'static,
{
self.data
.borrow()
.get(&TypeId::of::<T>())
.and_then(|boxed| boxed.downcast_ref().cloned())
}
}
impl AsRawFd for Runtime {
fn as_raw_fd(&self) -> RawFd {
self.driver.as_raw_fd()
}
}
impl Drop for Runtime {
fn drop(&mut self) {
CURRENT_RUNTIME.set(self, || {
while self.runnables.sync_runnables.pop().is_some() {}
loop {
let runnable = self.runnables.local_runnables.borrow_mut().pop_front();
if runnable.is_none() {
break;
}
}
})
}
}
struct RunnableQueue {
id: thread::ThreadId,
idle: Cell<bool>,
local_runnables: RefCell<VecDeque<Runnable>>,
sync_runnables: SegQueue<Runnable>,
}
impl RunnableQueue {
fn new() -> Self {
Self {
id: thread::current().id(),
idle: Cell::new(true),
local_runnables: RefCell::new(VecDeque::new()),
sync_runnables: SegQueue::new(),
}
}
fn schedule(&self, runnable: Runnable, handle: &NotifyHandle) {
if self.id == thread::current().id() {
self.local_runnables.borrow_mut().push_back(runnable);
if self.idle.get() {
let _ = handle.notify();
}
} else {
self.sync_runnables.push(runnable);
handle.notify().ok();
}
}
fn run(&self, event_interval: usize) {
self.idle.set(false);
for _ in 0..event_interval {
let task = self.local_runnables.borrow_mut().pop_front();
if let Some(task) = task {
task.run();
} else {
break;
}
}
for _ in 0..event_interval {
if !self.sync_runnables.is_empty() {
if let Some(task) = self.sync_runnables.pop() {
task.run();
continue;
}
}
break;
}
self.idle.set(true);
}
fn has_tasks(&self) -> bool {
!(self.local_runnables.borrow().is_empty() && self.sync_runnables.is_empty())
}
}
/// Builder for [`Runtime`].
#[derive(Debug, Clone)]
pub struct RuntimeBuilder {
proactor_builder: ProactorBuilder,
event_interval: usize,
}
impl Default for RuntimeBuilder {
fn default() -> Self {
Self::new()
}
}
impl RuntimeBuilder {
/// Create the builder with default config.
pub fn new() -> Self {
Self {
proactor_builder: ProactorBuilder::new(),
event_interval: 61,
}
}
/// Replace proactor builder.
pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self {
self.proactor_builder = builder;
self
}
/// Sets the number of scheduler ticks after which the scheduler will poll
/// for external events (timers, I/O, and so on).
///
/// A scheduler “tick” roughly corresponds to one poll invocation on a task.
pub fn event_interval(&mut self, val: usize) -> &mut Self {
self.event_interval = val;
self
}
/// Build [`Runtime`].
pub fn build(&self) -> io::Result<Runtime> {
Runtime::with_builder(self)
}
}
/// Spawns a new asynchronous task, returning a [`Task`] for it.
///
/// Spawning a task enables the task to execute concurrently to other tasks.
/// There is no guarantee that a spawned task will execute to completion.
///
/// ## Panics
///
/// This method doesn't create runtime. It tries to obtain the current runtime
/// by [`Runtime::with_current`].
pub fn spawn<F: Future + 'static>(future: F) -> Task<F::Output> {
Runtime::with_current(|r| r.spawn(future))
}
/// Spawns a blocking task in a new thread, and wait for it.
///
/// The task will not be cancelled even if the future is dropped.
///
/// ## Panics
///
/// This method doesn't create runtime. It tries to obtain the current runtime
/// by [`Runtime::with_current`].
pub fn spawn_blocking<T: Send + 'static>(
f: impl (FnOnce() -> T) + Send + 'static,
) -> JoinHandle<T> {
Runtime::with_current(|r| r.spawn_blocking(f))
}
/// Submit an operation to the current runtime, and return a future for it.
///
/// ## Panics
///
/// This method doesn't create runtime. It tries to obtain the current runtime
/// by [`Runtime::with_current`].
pub async fn submit<T: OpCode + 'static>(op: T) -> (io::Result<usize>, T) {
submit_with_flags(op).await
}
/// Submit an operation to the current runtime, and return a future for it with
/// flags.
///
/// ## Panics
///
/// This method doesn't create runtime. It tries to obtain the current runtime
/// by [`Runtime::with_current`].
pub async fn submit_with_flags<T: OpCode + 'static>(op: T) -> (io::Result<usize>, T) {
Runtime::with_current(|r| r.submit_with_flags(op)).await
}

View file

@ -25,10 +25,10 @@ tokio = ["ntex-rt/tokio", "ntex-tokio"]
compio = ["ntex-rt/compio", "ntex-compio"]
# neon runtime
neon = ["ntex-rt/neon", "ntex-neon/polling", "slab", "socket2"]
neon = ["ntex-rt/neon", "ntex-neon", "slab", "socket2"]
# neon io-uring runtime
neon-uring = ["ntex-rt/neon", "ntex-neon/io-uring", "io-uring", "slab", "socket2"]
polling = ["ntex-neon/polling", "dep:polling"]
io-uring = ["ntex-neon/io-uring", "dep:io-uring"]
[dependencies]
ntex-service = "3.3"
@ -43,15 +43,17 @@ ntex-compio = { version = "0.2.4", optional = true }
ntex-neon = { version = "0.1.0", optional = true }
bitflags = { workspace = true }
cfg-if = { workspace = true }
log = { workspace = true }
libc = { workspace = true }
thiserror = { workspace = true }
slab = { workspace = true, optional = true }
socket2 = { workspace = true, optional = true }
thiserror = { workspace = true }
# Linux specific dependencies
[target.'cfg(target_os = "linux")'.dependencies]
io-uring = { workspace = true, optional = true }
polling = { workspace = true, optional = true }
[dev-dependencies]
ntex = "2"

View file

@ -6,51 +6,18 @@ pub use ntex_tokio::{from_tcp_stream, tcp_connect, tcp_connect_in};
#[cfg(all(unix, feature = "tokio"))]
pub use ntex_tokio::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
feature = "neon",
not(feature = "neon-uring"),
not(feature = "tokio"),
not(feature = "compio")
))]
pub use crate::rt_polling::{
from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect,
unix_connect_in,
};
#[cfg(all(
feature = "neon-uring",
not(feature = "neon"),
not(feature = "tokio"),
not(feature = "compio")
))]
pub use crate::rt_uring::{
from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect,
unix_connect_in,
};
#[cfg(all(
feature = "compio",
not(feature = "tokio"),
not(feature = "neon"),
not(feature = "neon-uring")
))]
#[cfg(all(feature = "compio", not(feature = "tokio"), not(feature = "neon")))]
pub use ntex_compio::{from_tcp_stream, tcp_connect, tcp_connect_in};
#[cfg(all(
unix,
feature = "compio",
not(feature = "tokio"),
not(feature = "neon"),
not(feature = "neon-uring")
not(feature = "neon")
))]
pub use ntex_compio::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "neon"),
not(feature = "neon-uring")
))]
#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
mod no_rt {
use ntex_io::Io;
@ -115,10 +82,5 @@ mod no_rt {
}
}
#[cfg(all(
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "neon"),
not(feature = "neon-uring")
))]
#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
pub use no_rt::*;

View file

@ -8,20 +8,22 @@ pub mod connect;
pub use ntex_io::Io;
pub use ntex_rt::{spawn, spawn_blocking};
pub use self::compat::*;
#[cfg(all(
feature = "neon",
not(feature = "neon-uring"),
not(feature = "tokio"),
not(feature = "compio")
))]
mod rt_polling;
#[cfg(all(
feature = "neon-uring",
not(feature = "neon"),
not(feature = "tokio"),
not(feature = "compio")
))]
mod rt_uring;
cfg_if::cfg_if! {
if #[cfg(all(feature = "neon", target_os = "linux", feature = "io-uring"))] {
#[path = "rt_uring/mod.rs"]
mod rt_impl;
pub use self::rt_impl::{
from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect,
unix_connect_in,
};
} else if #[cfg(all(unix, feature = "neon"))] {
#[path = "rt_polling/mod.rs"]
mod rt_impl;
pub use self::rt_impl::{
from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect,
unix_connect_in,
};
} else {
pub use self::compat::*;
}
}

View file

@ -1,8 +1,7 @@
use std::{cell::RefCell, fmt, io, mem, num::NonZeroU32, rc::Rc, task::Poll};
use io_uring::{opcode, squeue::Entry, types::Fd};
use ntex_neon::driver::op::Handler;
use ntex_neon::driver::{AsRawFd, DriverApi};
use ntex_neon::driver::{op::Handler, AsRawFd, DriverApi};
use ntex_neon::Runtime;
use ntex_util::channel::oneshot;
use slab::Slab;

View file

@ -1,5 +1,11 @@
# Changes
## [2.10.0] - 2025-03-12
* Add "Inplace" channel
* Expose "yield_to" helper
## [2.9.0] - 2025-01-15
* Add EitherService/EitherServiceFactory

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-util"
version = "2.9.0"
version = "2.10.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for ntex framework"
keywords = ["network", "framework", "async", "futures"]

View file

@ -0,0 +1,81 @@
//! A futures-aware bounded(1) channel.
use std::{cell::Cell, fmt, future::poll_fn, task::Context, task::Poll};
use crate::task::LocalWaker;
/// Creates a new futures-aware, channel.
pub fn channel<T>() -> Inplace<T> {
Inplace {
value: Cell::new(None),
rx_task: LocalWaker::new(),
}
}
/// A futures-aware bounded(1) channel.
pub struct Inplace<T> {
value: Cell<Option<T>>,
rx_task: LocalWaker,
}
// The channels do not ever project Pin to the inner T
impl<T> Unpin for Inplace<T> {}
impl<T> fmt::Debug for Inplace<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Inplace<T>")
}
}
impl<T> Inplace<T> {
/// Set a successful result.
///
/// If the value is successfully enqueued for the remote end to receive,
/// then `Ok(())` is returned. If previose value is not consumed
/// then `Err` is returned with the value provided.
pub fn send(&self, val: T) -> Result<(), T> {
if let Some(v) = self.value.take() {
self.value.set(Some(v));
Err(val)
} else {
self.value.set(Some(val));
self.rx_task.wake();
Ok(())
}
}
/// Wait until the oneshot is ready and return value
pub async fn recv(&self) -> T {
poll_fn(|cx| self.poll_recv(cx)).await
}
/// Polls the oneshot to determine if value is ready
pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<T> {
// If we've got a value, then skip the logic below as we're done.
if let Some(val) = self.value.take() {
return Poll::Ready(val);
}
// Check if sender is dropped and return error if it is.
self.rx_task.register(cx.waker());
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::future::lazy;
#[ntex_macros::rt_test2]
async fn test_inplace() {
let ch = channel();
assert_eq!(lazy(|cx| ch.poll_recv(cx)).await, Poll::Pending);
assert!(ch.send(1).is_ok());
assert!(ch.send(2) == Err(2));
assert_eq!(lazy(|cx| ch.poll_recv(cx)).await, Poll::Ready(1));
assert!(ch.send(1).is_ok());
assert_eq!(ch.recv().await, 1);
}
}

View file

@ -2,6 +2,7 @@
mod cell;
pub mod condition;
pub mod inplace;
pub mod mpsc;
pub mod oneshot;
pub mod pool;

View file

@ -91,7 +91,6 @@ impl fmt::Debug for LocalWaker {
}
}
#[doc(hidden)]
/// Yields execution back to the current runtime.
pub async fn yield_to() {
use std::{future::Future, pin::Pin, task::Context, task::Poll};

View file

@ -52,7 +52,7 @@ compio = ["ntex-net/compio"]
neon = ["ntex-net/neon"]
# neon runtime
neon-uring = ["ntex-net/neon-uring"]
neon-uring = ["ntex-net/neon", "ntex-net/io-uring"]
# websocket support
ws = ["dep:sha-1"]
@ -69,7 +69,7 @@ ntex-macros = "0.1"
ntex-util = "2.8"
ntex-bytes = "0.1.27"
ntex-server = "2.7"
ntex-h2 = "1.8.1"
ntex-h2 = "1.8.6"
ntex-rt = "0.4.27"
ntex-io = "2.11"
ntex-net = "2.5"

View file

@ -21,6 +21,8 @@ async fn service(msg: ws::Frame) -> Result<Option<ws::Message>, io::Error> {
#[ntex::test]
async fn web_ws() {
let _ = env_logger::try_init();
let srv = test::server(|| {
App::new().service(web::resource("/").route(web::to(
|req: HttpRequest| async move {