diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index b4851692..18d70d3a 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.7] - 2025-03-21 + +* Simplify neon poll impl + ## [2.5.6] - 2025-03-20 * Redesign neon poll support diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 11d7f3bf..6dc6af0f 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.6" +version = "2.5.7" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -40,7 +40,7 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.6", optional = true } +ntex-neon = { version = "0.1.7", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } diff --git a/ntex-net/src/rt_polling/connect.rs b/ntex-net/src/rt_polling/connect.rs index 529c55b1..8f0f1dc9 100644 --- a/ntex-net/src/rt_polling/connect.rs +++ b/ntex-net/src/rt_polling/connect.rs @@ -1,5 +1,5 @@ use std::os::fd::{AsRawFd, RawFd}; -use std::{cell::RefCell, collections::VecDeque, io, rc::Rc, task::Poll}; +use std::{cell::RefCell, io, rc::Rc, task::Poll}; use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::{syscall, Runtime}; @@ -17,7 +17,6 @@ enum Change { } struct ConnectOpsBatcher { - feed: VecDeque<(usize, Change)>, inner: Rc, } @@ -41,10 +40,7 @@ impl ConnectOps { connects: RefCell::new(Slab::new()), }); inner = Some(ops.clone()); - Box::new(ConnectOpsBatcher { - inner: ops, - feed: VecDeque::new(), - }) + Box::new(ConnectOpsBatcher { inner: ops }) }); ConnectOps(inner.unwrap()) @@ -74,55 +70,42 @@ impl ConnectOps { impl Handler for ConnectOpsBatcher { fn event(&mut self, id: usize, event: Event) { log::debug!("connect-fd is readable {:?}", id); - self.feed.push_back((id, Change::Event(event))); - } - - fn error(&mut self, id: usize, err: io::Error) { - self.feed.push_back((id, Change::Error(err))); - } - - fn commit(&mut self) { - if self.feed.is_empty() { - return; - } - log::debug!("Commit connect driver changes, num: {:?}", self.feed.len()); let mut connects = self.inner.connects.borrow_mut(); - for (id, change) in self.feed.drain(..) { - if connects.contains(id) { - let item = connects.remove(id); - match change { - Change::Event(event) => { - if event.writable { - let mut err: libc::c_int = 0; - let mut err_len = - std::mem::size_of::() as libc::socklen_t; + if connects.contains(id) { + let item = connects.remove(id); + if event.writable { + let mut err: libc::c_int = 0; + let mut err_len = std::mem::size_of::() as libc::socklen_t; - let res = syscall!(libc::getsockopt( - item.fd.as_raw_fd(), - libc::SOL_SOCKET, - libc::SO_ERROR, - &mut err as *mut _ as *mut _, - &mut err_len - )); + let res = syscall!(libc::getsockopt( + item.fd.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_ERROR, + &mut err as *mut _ as *mut _, + &mut err_len + )); - let res = if err == 0 { - res.map(|_| ()) - } else { - Err(io::Error::from_raw_os_error(err)) - }; + let res = if err == 0 { + res.map(|_| ()) + } else { + Err(io::Error::from_raw_os_error(err)) + }; - self.inner.api.detach(item.fd, id as u32); - let _ = item.sender.send(res); - } - } - Change::Error(err) => { - let _ = item.sender.send(Err(err)); - self.inner.api.detach(item.fd, id as u32); - } - } + self.inner.api.detach(item.fd, id as u32); + let _ = item.sender.send(res); } } } + + fn error(&mut self, id: usize, err: io::Error) { + let mut connects = self.inner.connects.borrow_mut(); + + if connects.contains(id) { + let item = connects.remove(id); + let _ = item.sender.send(Err(err)); + self.inner.api.detach(item.fd, id as u32); + } + } } diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 385f15fa..eaa6d589 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,5 +1,5 @@ use std::os::fd::{AsRawFd, RawFd}; -use std::{cell::Cell, collections::VecDeque, future::Future, io, rc::Rc, task}; +use std::{cell::Cell, cell::RefCell, future::Future, io, rc::Rc, task, task::Poll}; use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::{syscall, Runtime}; @@ -14,7 +14,7 @@ pub(crate) struct StreamCtl { } bitflags::bitflags! { - #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] + #[derive(Copy, Clone, Debug)] struct Flags: u8 { const RD = 0b0000_0001; const WR = 0b0000_0010; @@ -31,23 +31,23 @@ struct StreamItem { pub(crate) struct StreamOps(Rc>); -#[derive(Debug)] -enum Change { - Event(Event), - Error(io::Error), -} - struct StreamOpsHandler { - feed: VecDeque<(usize, Change)>, inner: Rc>, } struct StreamOpsInner { api: DriverApi, - feed: Cell>>, + delayd_drop: Cell, + feed: RefCell>, streams: Cell>>>>, } +impl StreamItem { + fn tag(&self) -> &'static str { + self.context.tag() + } +} + impl StreamOps { pub(crate) fn current() -> Self { Runtime::value(|rt| { @@ -55,14 +55,12 @@ impl StreamOps { rt.driver().register(|api| { let ops = Rc::new(StreamOpsInner { api, - feed: Cell::new(Some(VecDeque::new())), + feed: RefCell::new(Vec::new()), + delayd_drop: Cell::new(false), streams: Cell::new(Some(Box::new(Slab::new()))), }); inner = Some(ops.clone()); - Box::new(StreamOpsHandler { - inner: ops, - feed: VecDeque::new(), - }) + Box::new(StreamOpsHandler { inner: ops }) }); StreamOps(inner.unwrap()) @@ -71,34 +69,27 @@ impl StreamOps { pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl { let fd = io.as_raw_fd(); - let item = StreamItem { - fd, - context, - io: Some(io), - ref_count: 1, - flags: Flags::empty(), - }; - let stream = self.with(move |streams| { - let id = streams.insert(item) as u32; + let stream = self.0.with(move |streams| { + let item = StreamItem { + fd, + context, + io: Some(io), + ref_count: 1, + flags: Flags::empty(), + }; StreamCtl { - id, + id: streams.insert(item) as u32, inner: self.0.clone(), } }); - self.0.api.attach(fd, stream.id, None); + self.0.api.attach( + fd, + stream.id, + Some(Event::new(0, false, false).with_interrupt()), + ); stream } - - fn with(&self, f: F) -> R - where - F: FnOnce(&mut Slab>) -> R, - { - let mut inner = self.0.streams.take().unwrap(); - let result = f(&mut inner); - self.0.streams.set(Some(inner)); - result - } } impl Clone for StreamOps { @@ -108,128 +99,112 @@ impl Clone for StreamOps { } impl Handler for StreamOpsHandler { - fn event(&mut self, id: usize, event: Event) { - log::debug!("FD is readable {:?}", id); - self.feed.push_back((id, Change::Event(event))); + fn event(&mut self, id: usize, ev: Event) { + log::debug!("FD event {:?} event: {:?}", id, ev); + + self.inner.with(|streams| { + if !streams.contains(id) { + return; + } + let item = &mut streams[id]; + + // handle HUP + if ev.is_interrupt() { + item.context.stopped(None); + if item.io.take().is_some() { + close(id as u32, item.fd, &self.inner.api); + } + return; + } + + let mut renew_ev = Event::new(0, false, false).with_interrupt(); + + if ev.readable { + let res = item.context.with_read_buf(|buf| { + let chunk = buf.chunk_mut(); + let result = task::ready!(syscall!( + break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len()) + )); + if let Ok(size) = result { + log::debug!("{}: data {:?}, s: {:?}", item.tag(), item.fd, size); + unsafe { buf.advance_mut(size) }; + } + Poll::Ready(result) + }); + + if res.is_pending() && item.context.is_read_ready() { + renew_ev.readable = true; + item.flags.insert(Flags::RD); + } else { + item.flags.remove(Flags::RD); + } + } else if item.flags.contains(Flags::RD) { + renew_ev.readable = true; + } + + if ev.writable { + let result = item.context.with_write_buf(|buf| { + log::debug!("{}: write {:?} s: {:?}", item.tag(), item.fd, buf.len()); + syscall!(break libc::write(item.fd, buf[..].as_ptr() as _, buf.len())) + }); + if result.is_pending() { + renew_ev.writable = true; + item.flags.insert(Flags::WR); + } else { + item.flags.remove(Flags::WR); + } + } else if item.flags.contains(Flags::WR) { + renew_ev.writable = true; + } + + self.inner.api.modify(item.fd, id as u32, renew_ev); + + // delayed drops + if self.inner.delayd_drop.get() { + for id in self.inner.feed.borrow_mut().drain(..) { + let item = &mut streams[id as usize]; + item.ref_count -= 1; + if item.ref_count == 0 { + let item = streams.remove(id as usize); + log::debug!( + "{}: Drop ({}), {:?}, has-io: {}", + item.tag(), + id, + item.fd, + item.io.is_some() + ); + if item.io.is_some() { + close(id, item.fd, &self.inner.api); + } + } + } + self.inner.delayd_drop.set(false); + } + }); } fn error(&mut self, id: usize, err: io::Error) { - log::debug!("FD is failed {:?}, err: {:?}", id, err); - self.feed.push_back((id, Change::Error(err))); + self.inner.with(|streams| { + if let Some(item) = streams.get_mut(id) { + log::debug!("FD is failed ({}) {:?}, err: {:?}", id, item.fd, err); + item.context.stopped(Some(err)); + if item.io.take().is_some() { + close(id as u32, item.fd, &self.inner.api); + } + } + }) } +} - fn commit(&mut self) { - if self.feed.is_empty() { - return; - } - log::debug!("Commit changes, num: {:?}", self.feed.len()); - - let mut streams = self.inner.streams.take().unwrap(); - - for (id, change) in self.feed.drain(..) { - match change { - Change::Event(ev) => { - let item = &mut streams[id]; - let mut renew_ev = Event::new(0, false, false).with_interrupt(); - if ev.readable { - let result = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let b = chunk.as_mut_ptr(); - task::Poll::Ready( - task::ready!(syscall!( - break libc::read(item.fd, b as _, chunk.len()) - )) - .inspect(|size| { - unsafe { buf.advance_mut(*size) }; - log::debug!( - "{}: {:?}, SIZE: {:?}", - item.context.tag(), - item.fd, - size - ); - }), - ) - }); - - if item.io.is_some() && result.is_pending() { - if item.context.is_read_ready() { - renew_ev.readable = true; - } - } - } else if item.flags.contains(Flags::RD) { - renew_ev.readable = true; - } - - if ev.writable { - let result = item.context.with_write_buf(|buf| { - log::debug!( - "{}: writing {:?} SIZE: {:?}", - item.context.tag(), - item.fd, - buf.len() - ); - let slice = &buf[..]; - syscall!( - break libc::write( - item.fd, - slice.as_ptr() as _, - slice.len() - ) - ) - }); - - if item.io.is_some() && result.is_pending() { - renew_ev.writable = true; - } - } else if item.flags.contains(Flags::WR) { - renew_ev.writable = true; - } - - if ev.is_interrupt() { - item.context.stopped(None); - if let Some(_) = item.io.take() { - close(id as u32, item.fd, &self.inner.api); - } - continue; - } else { - item.flags.set(Flags::RD, renew_ev.readable); - item.flags.set(Flags::WR, renew_ev.writable); - self.inner.api.modify(item.fd, id as u32, renew_ev); - } - } - Change::Error(err) => { - if let Some(item) = streams.get_mut(id) { - item.context.stopped(Some(err)); - if let Some(_) = item.io.take() { - close(id as u32, item.fd, &self.inner.api); - } - } - } - } - } - - // extra - let mut feed = self.inner.feed.take().unwrap(); - for id in feed.drain(..) { - let item = &mut streams[id as usize]; - item.ref_count -= 1; - if item.ref_count == 0 { - let item = streams.remove(id as usize); - log::debug!( - "{}: Drop io ({}), {:?}, has-io: {}", - item.context.tag(), - id, - item.fd, - item.io.is_some() - ); - if item.io.is_some() { - close(id, item.fd, &self.inner.api); - } - } - } - - self.inner.feed.set(Some(feed)); - self.inner.streams.set(Some(streams)); +impl StreamOpsInner { + fn with(&self, f: F) -> R + where + F: FnOnce(&mut Slab>) -> R, + { + let mut streams = self.streams.take().unwrap(); + let result = f(&mut streams); + self.streams.set(Some(streams)); + result } } @@ -244,7 +219,9 @@ fn close(id: u32, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle StreamCtl { pub(crate) fn close(self) -> impl Future> { let id = self.id as usize; - let (io, fd) = self.with(|streams| (streams[id].io.take(), streams[id].fd)); + let (io, fd) = self + .inner + .with(|streams| (streams[id].io.take(), streams[id].fd)); let fut = if let Some(io) = io { log::debug!("Closing ({}), {:?}", id, fd); std::mem::forget(io); @@ -266,90 +243,84 @@ impl StreamCtl { where F: FnOnce(Option<&T>) -> R, { - self.with(|streams| f(streams[self.id as usize].io.as_ref())) + self.inner + .with(|streams| f(streams[self.id as usize].io.as_ref())) } - pub(crate) fn modify(&self, readable: bool, writable: bool) { - self.with(|streams| { + pub(crate) fn modify(&self, rd: bool, wr: bool) { + self.inner.with(|streams| { let item = &mut streams[self.id as usize]; - item.flags = Flags::empty(); log::debug!( - "{}: Modify interest ({}), {:?} read: {:?}, write: {:?}", - item.context.tag(), + "{}: Modify interest ({}), {:?} rd: {:?}, wr: {:?}", + item.tag(), self.id, item.fd, - readable, - writable + rd, + wr ); let mut event = Event::new(0, false, false).with_interrupt(); - if readable { - let result = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let b = chunk.as_mut_ptr(); - task::Poll::Ready( - task::ready!(syscall!( - break libc::read(item.fd, b as _, chunk.len()) - )) - .inspect(|size| { - unsafe { buf.advance_mut(*size) }; + if rd { + if item.flags.contains(Flags::RD) { + event.readable = true; + } else { + let res = item.context.with_read_buf(|buf| { + let chunk = buf.chunk_mut(); + let result = task::ready!(syscall!( + break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len()) + )); + if let Ok(size) = result { log::debug!( - "{}: {:?}, SIZE: {:?}", - item.context.tag(), + "{}: read {:?}, s: {:?}", + item.tag(), item.fd, size ); - }), - ) - }); + unsafe { buf.advance_mut(size) }; + } + Poll::Ready(result) + }); - if item.io.is_some() && result.is_pending() { - if item.context.is_read_ready() { + if res.is_pending() && item.context.is_read_ready() { event.readable = true; item.flags.insert(Flags::RD); } } } - if writable { - let result = item.context.with_write_buf(|buf| { - log::debug!( - "{}: Writing io ({}), buf: {:?}", - item.context.tag(), - self.id, - buf.len() - ); - - let slice = &buf[..]; - syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len())) - }); - - if item.io.is_some() && result.is_pending() { + if wr { + if item.flags.contains(Flags::WR) { event.writable = true; - item.flags.insert(Flags::WR); + } else { + let result = item.context.with_write_buf(|buf| { + log::debug!( + "{}: Writing ({}), buf: {:?}", + item.tag(), + self.id, + buf.len() + ); + syscall!( + break libc::write(item.fd, buf[..].as_ptr() as _, buf.len()) + ) + }); + + if result.is_pending() { + event.writable = true; + item.flags.insert(Flags::WR); + } } } - self.inner.api.modify(item.fd, self.id as u32, event); + self.inner.api.modify(item.fd, self.id, event); }) } - - fn with(&self, f: F) -> R - where - F: FnOnce(&mut Slab>) -> R, - { - let mut inner = self.inner.streams.take().unwrap(); - let result = f(&mut inner); - self.inner.streams.set(Some(inner)); - result - } } impl Clone for StreamCtl { fn clone(&self) -> Self { - self.with(|streams| { + self.inner.with(|streams| { streams[self.id as usize].ref_count += 1; Self { id: self.id, @@ -368,7 +339,7 @@ impl Drop for StreamCtl { let item = streams.remove(id); log::debug!( "{}: Drop io ({}), {:?}, has-io: {}", - item.context.tag(), + item.tag(), self.id, item.fd, item.io.is_some() @@ -379,9 +350,8 @@ impl Drop for StreamCtl { } self.inner.streams.set(Some(streams)); } else { - let mut feed = self.inner.feed.take().unwrap(); - feed.push_back(self.id); - self.inner.feed.set(Some(feed)); + self.inner.delayd_drop.set(true); + self.inner.feed.borrow_mut().push(self.id); } } }