This commit is contained in:
Nikolay Kim 2025-03-15 00:27:49 +05:00
parent 05510c7b88
commit f06b4952d5

View file

@ -201,7 +201,7 @@ impl<T> Handler for StreamOpsHandler<T> {
let item = &mut streams[id];
item.ref_count -= 1;
if item.ref_count == 0 {
let item = streams.remove(id);
let mut item = streams.remove(id);
log::debug!(
"{}: Drop io ({}), {:?}, has-io: {}",
item.context.tag(),
@ -209,6 +209,7 @@ impl<T> Handler for StreamOpsHandler<T> {
item.fd,
item.io.is_some()
);
item.flags.insert(Flags::ERROR);
if item.io.is_some() {
self.inner.api.unregister_all(item.fd);
}
@ -222,8 +223,10 @@ impl<T> Handler for StreamOpsHandler<T> {
impl<T> StreamCtl<T> {
pub(crate) async fn close(self) -> io::Result<()> {
let (io, fd) =
self.with(|streams| (streams[self.id].io.take(), streams[self.id].fd));
let (io, fd) = self.with(|streams| {
streams[self.id].flags.insert(Flags::ERROR);
(streams[self.id].io.take(), streams[self.id].fd)
});
if let Some(io) = io {
std::mem::forget(io);
@ -286,7 +289,7 @@ impl<T> StreamCtl<T> {
self.id,
item.fd
);
if !item.flags.contains(Flags::RD) {
if !item.flags.intersects(Flags::RD | Flags::ERROR) {
item.flags.insert(Flags::RD);
self.inner
.api
@ -299,7 +302,7 @@ impl<T> StreamCtl<T> {
self.with(|streams| {
let item = &mut streams[self.id];
if !item.flags.contains(Flags::WR) {
if !item.flags.intersects(Flags::WR | Flags::ERROR) {
log::debug!(
"{}: Resume io write ({}), {:?}",
item.context.tag(),
@ -363,7 +366,7 @@ impl<T> Drop for StreamCtl<T> {
if let Some(mut streams) = self.inner.streams.take() {
streams[self.id].ref_count -= 1;
if streams[self.id].ref_count == 0 {
let item = streams.remove(self.id);
let mut item = streams.remove(self.id);
log::debug!(
"{}: Drop io ({}), {:?}, has-io: {}",
item.context.tag(),
@ -371,6 +374,7 @@ impl<T> Drop for StreamCtl<T> {
item.fd,
item.io.is_some()
);
item.flags.insert(Flags::ERROR);
if item.io.is_some() {
self.inner.api.unregister_all(item.fd);
}