This commit is contained in:
Nikolay Kim 2025-03-09 01:20:07 +05:00
parent 4c9c1adece
commit 29c944ff63
9 changed files with 57 additions and 57 deletions

View file

@ -21,12 +21,10 @@ ntex-bytes = "0.1"
ntex-io = "2.5"
ntex-util = "2"
ntex-rt = "0.4"
ntex-iodriver = "0.1"
bitflags = "2"
libc = "0.2.164"
log = "0.4"
slab = "0.4"
socket2 = "0.5.6"
compio-runtime = { version = "0.6", features = ["io-uring", "polling"] }
compio-buf = "0.5"
compio-io = "0.5"
compio-net = "0.6"
compio-driver = "0.6"
compio-runtime = { version = "0.6", features = ["io-uring", "polling", "event"] }

View file

@ -19,20 +19,16 @@ impl ReadContext {
Self(io.clone(), Cell::new(None))
}
pub fn clone(&self) -> Self {
Self(self.0.clone(), Cell::new(None))
}
#[inline]
/// Io tag
pub fn tag(&self) -> &'static str {
self.0.tag()
}
#[inline]
/// Io tag
pub fn io(&self) -> IoRef {
self.0.clone()
#[doc(hidden)]
/// Io flags
pub fn flags(&self) -> crate::flags::Flags {
self.0.flags()
}
#[inline]
@ -333,6 +329,12 @@ impl ReadContext {
}
}
impl Clone for ReadContext {
fn clone(&self) -> Self {
Self(self.0.clone(), Cell::new(None))
}
}
fn shutdown_filters(io: &IoRef) {
let st = &io.0;
let flags = st.flags.get();
@ -380,10 +382,6 @@ impl WriteContext {
Self(io.clone())
}
pub fn clone(&self) -> Self {
Self(self.0.clone())
}
#[inline]
/// Io tag
pub fn tag(&self) -> &'static str {
@ -448,30 +446,28 @@ impl WriteContext {
})
.await;
if flush_buf {
if !self.0.flags().contains(Flags::WR_PAUSED) {
st.insert_flags(Flags::WR_TASK_WAIT);
if flush_buf && !self.0.flags().contains(Flags::WR_PAUSED) {
st.insert_flags(Flags::WR_TASK_WAIT);
poll_fn(|cx| {
let flags = self.0.flags();
poll_fn(|cx| {
let flags = self.0.flags();
if flags.intersects(Flags::WR_PAUSED | Flags::IO_STOPPED) {
if flags.intersects(Flags::WR_PAUSED | Flags::IO_STOPPED) {
Poll::Ready(())
} else {
st.write_task.register(cx.waker());
if timeout.is_none() {
timeout = Some(sleep(st.disconnect_timeout.get()));
}
if timeout.as_ref().unwrap().poll_elapsed(cx).is_ready() {
Poll::Ready(())
} else {
st.write_task.register(cx.waker());
if timeout.is_none() {
timeout = Some(sleep(st.disconnect_timeout.get()));
}
if timeout.as_ref().unwrap().poll_elapsed(cx).is_ready() {
Poll::Ready(())
} else {
Poll::Pending
}
Poll::Pending
}
})
.await;
}
}
})
.await;
}
}
@ -551,14 +547,12 @@ impl WriteContext {
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)))
} else if n == buf.len() {
buf.clear();
Poll::Ready(Ok(0))
} else {
if n == buf.len() {
buf.clear();
Poll::Ready(Ok(0))
} else {
buf.advance(n);
Poll::Ready(Ok(buf.len()))
}
buf.advance(n);
Poll::Ready(Ok(buf.len()))
}
}
Err(e) => Poll::Ready(Err(e)),
@ -608,6 +602,12 @@ impl WriteContext {
}
}
impl Clone for WriteContext {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl WriteContextBuf {
pub fn set(&mut self, mut buf: BytesVec) {
if buf.is_empty() {

View file

@ -2,6 +2,7 @@
//! Some types differ by compilation target.
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![allow(clippy::type_complexity)]
#[cfg(all(
target_os = "linux",

View file

@ -1,3 +1,4 @@
#![allow(clippy::type_complexity)]
pub use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::{cell::Cell, cell::RefCell, collections::HashMap, io, rc::Rc, sync::Arc};
@ -366,13 +367,13 @@ impl Driver {
match change.interest {
InterestChange::Register(int) => {
let _ = item.register(change.user_data, int);
item.register(change.user_data, int);
}
InterestChange::Unregister(int) => {
let _ = item.unregister(int);
item.unregister(int);
}
InterestChange::UnregisterAll => {
let _ = item.unregister_all();
item.unregister_all();
}
}
}
@ -393,9 +394,9 @@ impl Driver {
self.renew(BorrowedFd::borrow_raw(change.fd), event, &mut registry)?;
if new {
registry.get_mut(&change.fd).map(|item| {
if let Some(item) = registry.get_mut(&change.fd) {
item.flags.remove(Flags::NEW);
});
}
}
}
}

View file

@ -32,7 +32,7 @@ pub(crate) async fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
ConnectOps::current().connect(socket.as_raw_fd(), addr, sender)?;
rx.await
.map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone").into())
.map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone"))
.and_then(|item| item)?;
Ok(TcpStream::from_socket(socket))
@ -57,7 +57,7 @@ pub(crate) async fn connect_unix(path: impl AsRef<Path>) -> io::Result<UnixStrea
ConnectOps::current().connect(socket.as_raw_fd(), addr, sender)?;
rx.await
.map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone").into())
.map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone"))
.and_then(|item| item)?;
Ok(UnixStream::from_socket(socket))

View file

@ -275,11 +275,7 @@ impl<T> StreamCtl<T> {
});
if result.is_pending() {
log::debug!(
"Write is pending ({}), {:?}",
self.id,
item.read.io().flags()
);
log::debug!("Write is pending ({}), {:?}", self.id, item.read.flags());
item.flags.insert(Flags::WR);
self.inner

View file

@ -1,3 +1,4 @@
#![allow(clippy::missing_safety_doc)]
use std::{future::Future, io, mem, mem::MaybeUninit};
use ntex_iodriver::{impl_raw_fd, op::CloseSocket, op::ShutdownSocket, syscall, AsRawFd};

View file

@ -1,3 +1,4 @@
#![allow(clippy::type_complexity)]
use std::any::{Any, TypeId};
use std::collections::{HashMap, VecDeque};
use std::future::{ready, Future};
@ -127,6 +128,7 @@ impl Runtime {
RuntimeBuilder::new()
}
#[allow(clippy::arc_with_non_send_sync)]
fn with_builder(builder: &RuntimeBuilder) -> io::Result<Self> {
Ok(Self {
driver: builder.proactor_builder.build()?,

View file

@ -71,6 +71,7 @@ async fn test_listen() {
#[ntex::test]
#[cfg(unix)]
#[allow(clippy::unused_io_amount)]
async fn test_run() {
let addr = TestServer::unused_addr();
let (tx, rx) = mpsc::channel();