This commit is contained in:
Nikolay Kim 2025-03-15 01:08:34 +05:00
parent 5d29038f30
commit f240db8296

View file

@ -121,48 +121,42 @@ impl<T> Handler for StreamOpsHandler<T> {
for (id, change) in self.feed.drain(..) {
match change {
Change::Readable => {
if let Some(item) = streams.get_mut(id) {
let result = item.context.with_read_buf(|buf| {
let chunk = buf.chunk_mut();
let b = chunk.as_mut_ptr();
Poll::Ready(
task::ready!(syscall!(
break libc::read(item.fd, b as _, chunk.len())
))
.inspect(|size| {
unsafe { buf.advance_mut(*size) };
log::debug!(
"{}: {:?}, SIZE: {:?}, BUF: {:?}",
item.context.tag(),
item.fd,
size,
buf,
);
}),
)
});
let item = &mut streams[id];
let result = item.context.with_read_buf(|buf| {
let chunk = buf.chunk_mut();
let b = chunk.as_mut_ptr();
Poll::Ready(
task::ready!(syscall!(
break libc::read(item.fd, b as _, chunk.len())
))
.inspect(|size| {
unsafe { buf.advance_mut(*size) };
log::debug!(
"{}: {:?}, SIZE: {:?}, BUF: {:?}",
item.context.tag(),
item.fd,
size,
buf,
);
}),
)
});
if result.is_pending() {
self.inner.api.register(item.fd, id, Interest::Readable);
}
if result.is_pending() {
self.inner.api.register(item.fd, id, Interest::Readable);
}
}
Change::Writable => {
if let Some(item) = streams.get_mut(id) {
let result = item.context.with_write_buf(|buf| {
let slice = &buf[..];
syscall!(
break libc::write(
item.fd,
slice.as_ptr() as _,
slice.len()
)
)
});
let item = &mut streams[id];
let result = item.context.with_write_buf(|buf| {
let slice = &buf[..];
syscall!(
break libc::write(item.fd, slice.as_ptr() as _, slice.len())
)
});
if result.is_pending() {
self.inner.api.register(item.fd, id, Interest::Writable);
}
if result.is_pending() {
self.inner.api.register(item.fd, id, Interest::Writable);
}
}
Change::Error(err) => {
@ -259,9 +253,30 @@ impl<T> StreamCtl<T> {
self.id,
item.fd
);
self.inner
.api
.register(item.fd, self.id, Interest::Readable);
let result = item.context.with_read_buf(|buf| {
let chunk = buf.chunk_mut();
let b = chunk.as_mut_ptr();
Poll::Ready(
task::ready!(syscall!(break libc::read(item.fd, b as _, chunk.len())))
.inspect(|size| {
unsafe { buf.advance_mut(*size) };
log::debug!(
"{}: {:?}, SIZE: {:?}, BUF: {:?}",
item.context.tag(),
item.fd,
size,
buf,
);
}),
)
});
if result.is_pending() {
self.inner
.api
.register(item.fd, self.id, Interest::Readable);
}
})
}