Close FD in various case for poll driver (#530)

This commit is contained in:
Nikolay Kim 2025-03-16 12:09:09 +01:00 committed by GitHub
parent f15c3203b1
commit 1f71b200ad
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 50 additions and 41 deletions

View file

@ -1,5 +1,5 @@
use std::os::fd::{AsRawFd, RawFd};
use std::{cell::Cell, collections::VecDeque, io, rc::Rc, task, task::Poll};
use std::{cell::Cell, collections::VecDeque, future::Future, io, rc::Rc, task};
use ntex_neon::driver::{DriverApi, Handler, Interest};
use ntex_neon::{syscall, Runtime};
@ -125,7 +125,7 @@ impl<T> Handler for StreamOpsHandler<T> {
let result = item.context.with_read_buf(|buf| {
let chunk = buf.chunk_mut();
let b = chunk.as_mut_ptr();
Poll::Ready(
task::Poll::Ready(
task::ready!(syscall!(
break libc::read(item.fd, b as _, chunk.len())
))
@ -142,27 +142,37 @@ impl<T> Handler for StreamOpsHandler<T> {
)
});
if result.is_pending() {
if item.io.is_some() && result.is_pending() {
self.inner.api.register(item.fd, id, Interest::Readable);
}
}
Change::Writable => {
let item = &mut streams[id];
let result = item.context.with_write_buf(|buf| {
log::debug!(
"{}: writing {:?} SIZE: {:?}, BUF: {:?}",
item.context.tag(),
item.fd,
buf.len(),
buf,
);
let slice = &buf[..];
syscall!(
break libc::write(item.fd, slice.as_ptr() as _, slice.len())
)
});
if result.is_pending() {
if item.io.is_some() && result.is_pending() {
log::debug!("{}: want write {:?}", item.context.tag(), item.fd,);
self.inner.api.register(item.fd, id, Interest::Writable);
}
}
Change::Error(err) => {
if let Some(item) = streams.get_mut(id) {
item.context.stopped(Some(err));
self.inner.api.unregister_all(item.fd);
if let Some(_) = item.io.take() {
close(id, item.fd, &self.inner.api);
}
}
}
}
@ -183,7 +193,7 @@ impl<T> Handler for StreamOpsHandler<T> {
item.io.is_some()
);
if item.io.is_some() {
self.inner.api.unregister_all(item.fd);
close(id, item.fd, &self.inner.api);
}
}
}
@ -193,19 +203,33 @@ impl<T> Handler for StreamOpsHandler<T> {
}
}
fn close(id: usize, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle<io::Result<i32>> {
api.unregister_all(fd);
ntex_rt::spawn_blocking(move || {
syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?;
syscall!(libc::close(fd))
})
}
impl<T> StreamCtl<T> {
pub(crate) async fn close(self) -> io::Result<()> {
pub(crate) fn close(self) -> impl Future<Output = io::Result<()>> {
let (io, fd) =
self.with(|streams| (streams[self.id].io.take(), streams[self.id].fd));
if let Some(io) = io {
let fut = if let Some(io) = io {
log::debug!("Closing ({}), {:?}", self.id, fd);
std::mem::forget(io);
ntex_rt::spawn_blocking(move || syscall!(libc::close(fd)))
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.and_then(crate::helpers::pool_io_err)?;
Some(close(self.id, fd, &self.inner.api))
} else {
None
};
async move {
if let Some(fut) = fut {
fut.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.and_then(crate::helpers::pool_io_err)?;
}
Ok(())
}
Ok(())
}
pub(crate) fn with_io<F, R>(&self, f: F) -> R
@ -257,7 +281,7 @@ impl<T> StreamCtl<T> {
let result = item.context.with_read_buf(|buf| {
let chunk = buf.chunk_mut();
let b = chunk.as_mut_ptr();
Poll::Ready(
task::Poll::Ready(
task::ready!(syscall!(break libc::read(item.fd, b as _, chunk.len())))
.inspect(|size| {
unsafe { buf.advance_mut(*size) };
@ -272,7 +296,7 @@ impl<T> StreamCtl<T> {
)
});
if result.is_pending() {
if item.io.is_some() && result.is_pending() {
self.inner
.api
.register(item.fd, self.id, Interest::Readable);
@ -284,12 +308,6 @@ impl<T> StreamCtl<T> {
self.with(|streams| {
let item = &mut streams[self.id];
log::debug!(
"{}: Resume io write ({}), {:?}",
item.context.tag(),
self.id,
item.fd
);
let result = item.context.with_write_buf(|buf| {
log::debug!(
"{}: Writing io ({}), buf: {:?}",
@ -302,7 +320,7 @@ impl<T> StreamCtl<T> {
syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len()))
});
if result.is_pending() {
if item.io.is_some() && result.is_pending() {
log::debug!(
"{}: Write is pending ({}), {:?}",
item.context.tag(),
@ -347,14 +365,14 @@ impl<T> Drop for StreamCtl<T> {
if streams[self.id].ref_count == 0 {
let item = streams.remove(self.id);
log::debug!(
"{}: Drop io ({}), {:?}, has-io: {}",
"{}: Drop io ({}), {:?}, has-io: {}",
item.context.tag(),
self.id,
item.fd,
item.io.is_some()
);
if item.io.is_some() {
self.inner.api.unregister_all(item.fd);
close(self.id, item.fd, &self.inner.api);
}
}
self.inner.streams.set(Some(streams));