This commit is contained in:
Nikolay Kim 2025-03-09 01:24:04 +05:00
parent 29c944ff63
commit 41e9f45bf8
8 changed files with 133 additions and 71 deletions

View file

@ -26,10 +26,6 @@ jobs:
- name: Clean coverage results
run: cargo llvm-cov clean --workspace
- name: Code coverage (glommio)
continue-on-error: true
run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/glommio,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli" -- --skip test_unhandled_data
- name: Code coverage (tokio)
run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/tokio,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
@ -37,7 +33,7 @@ jobs:
run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/compio,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
- name: Code coverage (default)
run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/default,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/default-rt,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
- name: Generate coverage report
run: cargo llvm-cov report --lcov --output-path lcov.info --ignore-filename-regex="ntex-compio|ntex-tokio|ntex-glommio|ntex-async-std"

View file

@ -59,12 +59,6 @@ jobs:
run: |
cargo test --all --no-default-features --features="ntex/default-rt,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
- name: Run tests (async-std)
timeout-minutes: 40
continue-on-error: true
run: |
cargo test --all --no-default-features --features="ntex/async-std,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
- name: Install cargo-cache
continue-on-error: true
run: |

View file

@ -44,6 +44,10 @@ jobs:
timeout-minutes: 40
run: cargo test --all --no-default-features --no-fail-fast --features="ntex/compio,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
- name: Run tests (default)
timeout-minutes: 40
run: cargo test --all --no-default-features --no-fail-fast --features="ntex/default-rt,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
- name: Install cargo-cache
continue-on-error: true
run: cargo install cargo-cache --no-default-features --features ci-autoclean

View file

@ -69,7 +69,7 @@ libc = { workspace = true }
cfg_aliases = { workspace = true }
[features]
default = ["io-uring"]
default = ["polling"]
polling = ["dep:polling"]
io-uring-sqe128 = []

View file

@ -13,11 +13,15 @@ compile_error!(
"You must choose at least one of these features: [\"io-uring\", \"polling\"]"
);
#[cfg(unix)]
use std::{io, task::Poll, task::Waker, time::Duration};
#[cfg(unix)]
mod key;
#[cfg(unix)]
pub use key::Key;
#[cfg(unix)]
pub mod op;
#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(all())))]
@ -25,12 +29,32 @@ mod unix;
#[cfg(unix)]
use unix::Overlapped;
#[cfg(unix)]
mod asyncify;
#[cfg(unix)]
pub use asyncify::*;
mod driver_type;
pub use driver_type::*;
thread_local! {
static LOGGING: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
}
/// enable logging for thread
pub fn enable_logging() {
LOGGING.with(|v| v.set(true));
}
/// enable logging for thread
pub fn log<T: AsRef<str>>(s: T) {
LOGGING.with(|_v| {
//if _v.get() {
println!("{}", s.as_ref());
//}
});
}
cfg_if::cfg_if! {
//if #[cfg(windows)] {
// #[path = "iocp/mod.rs"]
@ -45,6 +69,7 @@ cfg_if::cfg_if! {
}
}
#[cfg(unix)]
pub use sys::*;
#[cfg(windows)]
@ -198,12 +223,14 @@ impl<K, R> PushEntry<K, R> {
}
}
#[cfg(unix)]
/// Low-level actions of completion-based IO.
/// It owns the operations to keep the driver safe.
pub struct Proactor {
driver: Driver,
}
#[cfg(unix)]
impl Proactor {
/// Create [`Proactor`] with 1024 entries.
pub fn new() -> io::Result<Self> {
@ -306,6 +333,7 @@ impl Proactor {
}
}
#[cfg(unix)]
impl AsRawFd for Proactor {
fn as_raw_fd(&self) -> RawFd {
self.driver.as_raw_fd()
@ -313,6 +341,7 @@ impl AsRawFd for Proactor {
}
/// An completed entry returned from kernel.
#[cfg(unix)]
#[derive(Debug)]
pub(crate) struct Entry {
user_data: usize,
@ -320,6 +349,7 @@ pub(crate) struct Entry {
flags: u32,
}
#[cfg(unix)]
impl Entry {
pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
Self {
@ -361,18 +391,21 @@ impl Entry {
}
}
#[cfg(unix)]
#[derive(Debug, Clone)]
enum ThreadPoolBuilder {
Create { limit: usize, recv_limit: Duration },
Reuse(AsyncifyPool),
}
#[cfg(unix)]
impl Default for ThreadPoolBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(unix)]
impl ThreadPoolBuilder {
pub fn new() -> Self {
Self::Create {
@ -390,6 +423,7 @@ impl ThreadPoolBuilder {
}
/// Builder for [`Proactor`].
#[cfg(unix)]
#[derive(Debug, Clone)]
pub struct ProactorBuilder {
capacity: u32,
@ -397,12 +431,14 @@ pub struct ProactorBuilder {
sqpoll_idle: Option<Duration>,
}
#[cfg(unix)]
impl Default for ProactorBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(unix)]
impl ProactorBuilder {
/// Create the builder with default config.
pub fn new() -> Self {

View file

@ -111,23 +111,38 @@ impl FdItem {
}
#[derive(Debug)]
enum InterestChange {
Register(Interest),
Unregister(Interest),
UnregisterAll,
enum Change {
Register {
fd: RawFd,
batch: usize,
user_data: usize,
int: Interest,
},
Unregister {
fd: RawFd,
batch: usize,
int: Interest,
},
UnregisterAll {
fd: RawFd,
batch: usize,
},
Blocking {
user_data: usize,
},
}
#[derive(Debug)]
struct BatchChange {
fd: RawFd,
batch: usize,
user_data: usize,
interest: InterestChange,
}
// #[derive(Debug)]
// struct BatchChange {
// fd: RawFd,
// batch: usize,
// user_data: usize,
// interest: InterestChange,
// }
pub struct DriverApi {
batch: usize,
changes: Rc<RefCell<Vec<BatchChange>>>,
changes: Rc<RefCell<Vec<Change>>>,
}
impl DriverApi {
@ -138,11 +153,11 @@ impl DriverApi {
fd,
user_data
);
self.change(BatchChange {
self.change(Change::Register {
fd,
user_data,
batch: self.batch,
interest: InterestChange::Register(int),
user_data,
int,
});
}
@ -153,24 +168,21 @@ impl DriverApi {
fd,
self.batch
);
self.change(BatchChange {
self.change(Change::Unregister {
fd,
user_data: 0,
batch: self.batch,
interest: InterestChange::Unregister(int),
int,
});
}
pub fn unregister_all(&self, fd: RawFd) {
self.change(BatchChange {
self.change(Change::UnregisterAll {
fd,
user_data: 0,
batch: self.batch,
interest: InterestChange::UnregisterAll,
});
}
fn change(&self, change: BatchChange) {
fn change(&self, change: Change) {
self.changes.borrow_mut().push(change);
}
}
@ -183,7 +195,7 @@ pub(crate) struct Driver {
pool: AsyncifyPool,
pool_completed: Arc<SegQueue<Entry>>,
hid: Cell<usize>,
changes: Rc<RefCell<Vec<BatchChange>>>,
changes: Rc<RefCell<Vec<Change>>>,
handlers: Cell<Option<Box<Vec<Box<dyn Handler>>>>>,
}
@ -236,7 +248,7 @@ impl Driver {
registry: &mut HashMap<RawFd, FdItem>,
) -> io::Result<()> {
if !renew_event.readable && !renew_event.writable {
//println!("DELETE - 2");
// crate::log(format!("DELETE - {:?}", fd.as_raw_fd()));
if let Some(item) = registry.remove(&fd.as_raw_fd()) {
if !item.flags.contains(Flags::NEW) {
@ -246,12 +258,12 @@ impl Driver {
} else {
if let Some(item) = registry.get(&fd.as_raw_fd()) {
if item.flags.contains(Flags::NEW) {
//println!("ADD - 2 {:?}", fd.as_raw_fd());
// crate::log(format!("ADD - {:?}", fd.as_raw_fd()));
unsafe { self.poll.add(&fd, renew_event)? };
return Ok(());
}
}
//println!("MODIFY - 2");
// crate::log(format!("MODIFY - {:?} {:?}", fd.as_raw_fd(), renew_event));
self.poll.modify(fd, renew_event)?;
}
Ok(())
@ -266,7 +278,12 @@ impl Driver {
let op_pin = op.as_op_pin();
match op_pin.pre_submit()? {
Decision::Completed(res) => Poll::Ready(Ok(res)),
Decision::Blocking => self.push_blocking(user_data),
Decision::Blocking => {
self.changes
.borrow_mut()
.push(Change::Blocking { user_data });
Poll::Pending
}
}
}
@ -282,9 +299,10 @@ impl Driver {
}
let mut events = self.events.borrow_mut();
self.poll.wait(&mut events, timeout)?;
let res = self.poll.wait(&mut events, timeout);
res?;
if events.is_empty() && timeout != Some(Duration::ZERO) {
if events.is_empty() && timeout != Some(Duration::ZERO) && timeout.is_some() {
return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
}
// println!("POLL, events: {:?}", events.len());
@ -360,42 +378,55 @@ impl Driver {
let mut registry = self.registry.borrow_mut();
for change in &mut *changes {
let item = registry
.entry(change.fd)
.or_insert_with(|| FdItem::new(change.batch));
debug_assert!(item.batch == change.batch, "{:?} - {:?}", item, change);
match change.interest {
InterestChange::Register(int) => {
item.register(change.user_data, int);
match change {
Change::Register {
fd,
batch,
user_data,
int,
} => {
let item = registry.entry(*fd).or_insert_with(|| FdItem::new(*batch));
item.register(*user_data, *int);
}
InterestChange::Unregister(int) => {
item.unregister(int);
Change::Unregister { fd, batch, int } => {
let item = registry.entry(*fd).or_insert_with(|| FdItem::new(*batch));
item.unregister(*int);
}
InterestChange::UnregisterAll => {
Change::UnregisterAll { fd, batch } => {
let item = registry.entry(*fd).or_insert_with(|| FdItem::new(*batch));
item.unregister_all();
}
_ => {}
}
}
for change in changes.drain(..) {
let result = registry.get_mut(&change.fd).and_then(|item| {
if item.flags.contains(Flags::CHANGED) {
item.flags.remove(Flags::CHANGED);
Some((
item.event(change.fd as usize),
item.flags.contains(Flags::NEW),
))
} else {
let fd = match change {
Change::Register { fd, .. } => Some(fd),
Change::Unregister { fd, .. } => Some(fd),
Change::UnregisterAll { fd, .. } => Some(fd),
Change::Blocking { user_data } => {
self.push_blocking(user_data);
None
}
});
if let Some((event, new)) = result {
self.renew(BorrowedFd::borrow_raw(change.fd), event, &mut registry)?;
};
if new {
if let Some(item) = registry.get_mut(&change.fd) {
item.flags.remove(Flags::NEW);
if let Some(fd) = fd {
let result = registry.get_mut(&fd).and_then(|item| {
if item.flags.contains(Flags::CHANGED) {
item.flags.remove(Flags::CHANGED);
Some((item.event(fd as usize), item.flags.contains(Flags::NEW)))
} else {
None
}
});
if let Some((event, new)) = result {
self.renew(BorrowedFd::borrow_raw(fd), event, &mut registry)?;
if new {
if let Some(item) = registry.get_mut(&fd) {
item.flags.remove(Flags::NEW);
}
}
}
}
@ -404,7 +435,8 @@ impl Driver {
Ok(())
}
fn push_blocking(&self, user_data: usize) -> Poll<io::Result<usize>> {
fn push_blocking(&self, user_data: usize) {
// -> Poll<io::Result<usize>> {
let poll = self.poll.clone();
let completed = self.pool_completed.clone();
let mut closure = move || {
@ -419,7 +451,7 @@ impl Driver {
};
loop {
match self.pool.dispatch(closure) {
Ok(()) => return Poll::Pending,
Ok(()) => return,
Err(e) => {
closure = e.0;
self.poll_blocking();

View file

@ -317,7 +317,9 @@ impl Runtime {
io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
log::debug!("expected error: {e}");
}
_ => panic!("{e:?}"),
_ => {
panic!("{e:?}")
}
},
}
}

View file

@ -114,8 +114,6 @@ webpki-roots = { version = "0.26", optional = true }
brotli2 = { version = "0.3.2", optional = true }
flate2 = { version = "1.0", optional = true }
compio-driver = "*"
[dev-dependencies]
env_logger = "0.11"
rand = "0.8"