This commit is contained in:
Nikolay Kim 2025-03-15 00:11:10 +05:00
parent ff028c393b
commit 05510c7b88

View file

@ -132,48 +132,54 @@ impl<T> Handler for StreamOpsHandler<T> {
for (id, change) in self.feed.drain(..) { for (id, change) in self.feed.drain(..) {
match change { match change {
Change::Readable => { Change::Readable => {
let item = &mut streams[id]; if let Some(item) = streams.get_mut(id) {
let result = item.context.with_read_buf(|buf| { let result = item.context.with_read_buf(|buf| {
let chunk = buf.chunk_mut(); let chunk = buf.chunk_mut();
let b = chunk.as_mut_ptr(); let b = chunk.as_mut_ptr();
Poll::Ready( Poll::Ready(
task::ready!(syscall!( task::ready!(syscall!(
break libc::read(item.fd, b as _, chunk.len()) break libc::read(item.fd, b as _, chunk.len())
)) ))
.inspect(|size| { .inspect(|size| {
unsafe { buf.advance_mut(*size) }; unsafe { buf.advance_mut(*size) };
log::debug!( log::debug!(
"{}: {:?}, SIZE: {:?}, BUF: {:?}", "{}: {:?}, SIZE: {:?}, BUF: {:?}",
item.context.tag(), item.context.tag(),
item.fd, item.fd,
size, size,
buf buf,
); );
}), }),
) )
}); });
if result.is_pending() { if result.is_pending() {
item.flags.insert(Flags::RD); item.flags.insert(Flags::RD);
self.inner.api.register(item.fd, id, Interest::Readable); self.inner.api.register(item.fd, id, Interest::Readable);
} else { } else {
item.flags.remove(Flags::RD); item.flags.remove(Flags::RD);
}
} }
} }
Change::Writable => { Change::Writable => {
let item = &mut streams[id]; if let Some(item) = streams.get_mut(id) {
let result = item.context.with_write_buf(|buf| { let result = item.context.with_write_buf(|buf| {
let slice = &buf[..]; let slice = &buf[..];
syscall!( syscall!(
break libc::write(item.fd, slice.as_ptr() as _, slice.len()) break libc::write(
) item.fd,
}); slice.as_ptr() as _,
slice.len()
)
)
});
if result.is_pending() { if result.is_pending() {
item.flags.insert(Flags::WR); item.flags.insert(Flags::WR);
self.inner.api.register(item.fd, id, Interest::Writable); self.inner.api.register(item.fd, id, Interest::Writable);
} else { } else {
item.flags.remove(Flags::WR); item.flags.remove(Flags::WR);
}
} }
} }
Change::Error(err) => { Change::Error(err) => {
@ -203,7 +209,9 @@ impl<T> Handler for StreamOpsHandler<T> {
item.fd, item.fd,
item.io.is_some() item.io.is_some()
); );
self.inner.api.unregister_all(item.fd); if item.io.is_some() {
self.inner.api.unregister_all(item.fd);
}
} }
} }
@ -363,7 +371,9 @@ impl<T> Drop for StreamCtl<T> {
item.fd, item.fd,
item.io.is_some() item.io.is_some()
); );
self.inner.api.unregister_all(item.fd); if item.io.is_some() {
self.inner.api.unregister_all(item.fd);
}
} }
self.inner.streams.set(Some(streams)); self.inner.streams.set(Some(streams));
} else { } else {