From 05510c7b88ef763366dba88c1b5371ed959b7e57 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 15 Mar 2025 00:11:10 +0500 Subject: [PATCH] wip --- ntex-net/src/rt_polling/driver.rs | 88 +++++++++++++++++-------------- 1 file changed, 49 insertions(+), 39 deletions(-) diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 2f35e98d..72d22607 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -132,48 +132,54 @@ impl Handler for StreamOpsHandler { for (id, change) in self.feed.drain(..) { match change { Change::Readable => { - let item = &mut streams[id]; - let result = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let b = chunk.as_mut_ptr(); - Poll::Ready( - task::ready!(syscall!( - break libc::read(item.fd, b as _, chunk.len()) - )) - .inspect(|size| { - unsafe { buf.advance_mut(*size) }; - log::debug!( - "{}: {:?}, SIZE: {:?}, BUF: {:?}", - item.context.tag(), - item.fd, - size, - buf - ); - }), - ) - }); + if let Some(item) = streams.get_mut(id) { + let result = item.context.with_read_buf(|buf| { + let chunk = buf.chunk_mut(); + let b = chunk.as_mut_ptr(); + Poll::Ready( + task::ready!(syscall!( + break libc::read(item.fd, b as _, chunk.len()) + )) + .inspect(|size| { + unsafe { buf.advance_mut(*size) }; + log::debug!( + "{}: {:?}, SIZE: {:?}, BUF: {:?}", + item.context.tag(), + item.fd, + size, + buf, + ); + }), + ) + }); - if result.is_pending() { - item.flags.insert(Flags::RD); - self.inner.api.register(item.fd, id, Interest::Readable); - } else { - item.flags.remove(Flags::RD); + if result.is_pending() { + item.flags.insert(Flags::RD); + self.inner.api.register(item.fd, id, Interest::Readable); + } else { + item.flags.remove(Flags::RD); + } } } Change::Writable => { - let item = &mut streams[id]; - let result = item.context.with_write_buf(|buf| { - let slice = &buf[..]; - syscall!( - break libc::write(item.fd, slice.as_ptr() as _, slice.len()) - ) - }); + if let Some(item) = streams.get_mut(id) { + let result = item.context.with_write_buf(|buf| { + let slice = &buf[..]; + syscall!( + break libc::write( + item.fd, + slice.as_ptr() as _, + slice.len() + ) + ) + }); - if result.is_pending() { - item.flags.insert(Flags::WR); - self.inner.api.register(item.fd, id, Interest::Writable); - } else { - item.flags.remove(Flags::WR); + if result.is_pending() { + item.flags.insert(Flags::WR); + self.inner.api.register(item.fd, id, Interest::Writable); + } else { + item.flags.remove(Flags::WR); + } } } Change::Error(err) => { @@ -203,7 +209,9 @@ impl Handler for StreamOpsHandler { item.fd, item.io.is_some() ); - self.inner.api.unregister_all(item.fd); + if item.io.is_some() { + self.inner.api.unregister_all(item.fd); + } } } @@ -363,7 +371,9 @@ impl Drop for StreamCtl { item.fd, item.io.is_some() ); - self.inner.api.unregister_all(item.fd); + if item.io.is_some() { + self.inner.api.unregister_all(item.fd); + } } self.inner.streams.set(Some(streams)); } else {