mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 05:17:39 +03:00
wip
This commit is contained in:
parent
a772e88665
commit
5d29038f30
1 changed files with 46 additions and 83 deletions
|
@ -8,15 +8,6 @@ use slab::Slab;
|
|||
use ntex_bytes::BufMut;
|
||||
use ntex_io::IoContext;
|
||||
|
||||
bitflags::bitflags! {
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||
struct Flags: u8 {
|
||||
const ERROR = 0b0000_0001;
|
||||
const RD = 0b0000_0010;
|
||||
const WR = 0b0000_0100;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct StreamCtl<T> {
|
||||
id: usize,
|
||||
inner: Rc<StreamOpsInner<T>>,
|
||||
|
@ -26,7 +17,6 @@ struct StreamItem<T> {
|
|||
io: Option<T>,
|
||||
fd: RawFd,
|
||||
context: IoContext,
|
||||
flags: Flags,
|
||||
ref_count: usize,
|
||||
}
|
||||
|
||||
|
@ -76,7 +66,6 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
|
|||
context,
|
||||
fd: io.as_raw_fd(),
|
||||
io: Some(io),
|
||||
flags: Flags::empty(),
|
||||
ref_count: 1,
|
||||
};
|
||||
self.with(|streams| {
|
||||
|
@ -154,10 +143,7 @@ impl<T> Handler for StreamOpsHandler<T> {
|
|||
});
|
||||
|
||||
if result.is_pending() {
|
||||
item.flags.insert(Flags::RD);
|
||||
self.inner.api.register(item.fd, id, Interest::Readable);
|
||||
} else {
|
||||
item.flags.remove(Flags::RD);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -175,21 +161,14 @@ impl<T> Handler for StreamOpsHandler<T> {
|
|||
});
|
||||
|
||||
if result.is_pending() {
|
||||
item.flags.insert(Flags::WR);
|
||||
self.inner.api.register(item.fd, id, Interest::Writable);
|
||||
} else {
|
||||
item.flags.remove(Flags::WR);
|
||||
}
|
||||
}
|
||||
}
|
||||
Change::Error(err) => {
|
||||
if let Some(item) = streams.get_mut(id) {
|
||||
item.context.stopped(Some(err));
|
||||
if !item.flags.contains(Flags::ERROR) {
|
||||
item.flags.insert(Flags::ERROR);
|
||||
item.flags.remove(Flags::RD | Flags::WR);
|
||||
self.inner.api.unregister_all(item.fd);
|
||||
}
|
||||
self.inner.api.unregister_all(item.fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -201,7 +180,7 @@ impl<T> Handler for StreamOpsHandler<T> {
|
|||
let item = &mut streams[id];
|
||||
item.ref_count -= 1;
|
||||
if item.ref_count == 0 {
|
||||
let mut item = streams.remove(id);
|
||||
let item = streams.remove(id);
|
||||
log::debug!(
|
||||
"{}: Drop io ({}), {:?}, has-io: {}",
|
||||
item.context.tag(),
|
||||
|
@ -209,7 +188,6 @@ impl<T> Handler for StreamOpsHandler<T> {
|
|||
item.fd,
|
||||
item.io.is_some()
|
||||
);
|
||||
item.flags.insert(Flags::ERROR);
|
||||
if item.io.is_some() {
|
||||
self.inner.api.unregister_all(item.fd);
|
||||
}
|
||||
|
@ -223,10 +201,8 @@ impl<T> Handler for StreamOpsHandler<T> {
|
|||
|
||||
impl<T> StreamCtl<T> {
|
||||
pub(crate) async fn close(self) -> io::Result<()> {
|
||||
let (io, fd) = self.with(|streams| {
|
||||
streams[self.id].flags.insert(Flags::ERROR);
|
||||
(streams[self.id].io.take(), streams[self.id].fd)
|
||||
});
|
||||
let (io, fd) =
|
||||
self.with(|streams| (streams[self.id].io.take(), streams[self.id].fd));
|
||||
if let Some(io) = io {
|
||||
std::mem::forget(io);
|
||||
|
||||
|
@ -249,16 +225,13 @@ impl<T> StreamCtl<T> {
|
|||
self.with(|streams| {
|
||||
let item = &mut streams[self.id];
|
||||
|
||||
if item.flags.intersects(Flags::RD | Flags::WR) {
|
||||
log::debug!(
|
||||
"{}: Pause all io ({}), {:?}",
|
||||
item.context.tag(),
|
||||
self.id,
|
||||
item.fd
|
||||
);
|
||||
item.flags.remove(Flags::RD | Flags::WR);
|
||||
self.inner.api.unregister_all(item.fd);
|
||||
}
|
||||
log::debug!(
|
||||
"{}: Pause all io ({}), {:?}",
|
||||
item.context.tag(),
|
||||
self.id,
|
||||
item.fd
|
||||
);
|
||||
self.inner.api.unregister_all(item.fd);
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -272,29 +245,23 @@ impl<T> StreamCtl<T> {
|
|||
self.id,
|
||||
item.fd
|
||||
);
|
||||
if item.flags.contains(Flags::RD) {
|
||||
item.flags.remove(Flags::RD);
|
||||
self.inner.api.unregister(item.fd, Interest::Readable);
|
||||
}
|
||||
self.inner.api.unregister(item.fd, Interest::Readable);
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn resume_read(&self) {
|
||||
self.with(|streams| {
|
||||
if let Some(item) = streams.get_mut(self.id) {
|
||||
log::debug!(
|
||||
"{}: Resume io read ({}), {:?}",
|
||||
item.context.tag(),
|
||||
self.id,
|
||||
item.fd
|
||||
);
|
||||
if !item.flags.intersects(Flags::RD | Flags::ERROR) {
|
||||
item.flags.insert(Flags::RD);
|
||||
self.inner
|
||||
.api
|
||||
.register(item.fd, self.id, Interest::Readable);
|
||||
}
|
||||
}
|
||||
let item = &mut streams[self.id];
|
||||
|
||||
log::debug!(
|
||||
"{}: Resume io read ({}), {:?}",
|
||||
item.context.tag(),
|
||||
self.id,
|
||||
item.fd
|
||||
);
|
||||
self.inner
|
||||
.api
|
||||
.register(item.fd, self.id, Interest::Readable);
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -302,38 +269,35 @@ impl<T> StreamCtl<T> {
|
|||
self.with(|streams| {
|
||||
let item = &mut streams[self.id];
|
||||
|
||||
if !item.flags.intersects(Flags::WR | Flags::ERROR) {
|
||||
log::debug!(
|
||||
"{}: Resume io write ({}), {:?}",
|
||||
item.context.tag(),
|
||||
self.id,
|
||||
item.fd
|
||||
);
|
||||
let result = item.context.with_write_buf(|buf| {
|
||||
log::debug!(
|
||||
"{}: Resume io write ({}), {:?}",
|
||||
"{}: Writing io ({}), buf: {:?}",
|
||||
item.context.tag(),
|
||||
self.id,
|
||||
item.fd
|
||||
buf.len()
|
||||
);
|
||||
let result = item.context.with_write_buf(|buf| {
|
||||
log::debug!(
|
||||
"{}: Writing io ({}), buf: {:?}",
|
||||
item.context.tag(),
|
||||
self.id,
|
||||
buf.len()
|
||||
);
|
||||
|
||||
let slice = &buf[..];
|
||||
syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len()))
|
||||
});
|
||||
let slice = &buf[..];
|
||||
syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len()))
|
||||
});
|
||||
|
||||
if result.is_pending() {
|
||||
log::debug!(
|
||||
"{}: Write is pending ({}), {:?}",
|
||||
item.context.tag(),
|
||||
self.id,
|
||||
item.context.flags()
|
||||
);
|
||||
if result.is_pending() {
|
||||
log::debug!(
|
||||
"{}: Write is pending ({}), {:?}",
|
||||
item.context.tag(),
|
||||
self.id,
|
||||
item.context.flags()
|
||||
);
|
||||
|
||||
item.flags.insert(Flags::WR);
|
||||
self.inner
|
||||
.api
|
||||
.register(item.fd, self.id, Interest::Writable);
|
||||
}
|
||||
self.inner
|
||||
.api
|
||||
.register(item.fd, self.id, Interest::Writable);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -366,7 +330,7 @@ impl<T> Drop for StreamCtl<T> {
|
|||
if let Some(mut streams) = self.inner.streams.take() {
|
||||
streams[self.id].ref_count -= 1;
|
||||
if streams[self.id].ref_count == 0 {
|
||||
let mut item = streams.remove(self.id);
|
||||
let item = streams.remove(self.id);
|
||||
log::debug!(
|
||||
"{}: Drop io ({}), {:?}, has-io: {}",
|
||||
item.context.tag(),
|
||||
|
@ -374,7 +338,6 @@ impl<T> Drop for StreamCtl<T> {
|
|||
item.fd,
|
||||
item.io.is_some()
|
||||
);
|
||||
item.flags.insert(Flags::ERROR);
|
||||
if item.io.is_some() {
|
||||
self.inner.api.unregister_all(item.fd);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue