mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 13:27:39 +03:00
Fix socket connect for io-uring driver (#525)
This commit is contained in:
parent
14d2634e3d
commit
fe108f30c9
7 changed files with 32 additions and 28 deletions
|
@ -1,5 +1,9 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [2.5.1] - 2025-03-14
|
||||||
|
|
||||||
|
* Fix socket connect for io-uring driver
|
||||||
|
|
||||||
## [2.5.0] - 2025-03-12
|
## [2.5.0] - 2025-03-12
|
||||||
|
|
||||||
* Add neon runtime support
|
* Add neon runtime support
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex-net"
|
name = "ntex-net"
|
||||||
version = "2.5.0"
|
version = "2.5.1"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "ntexwork utils for ntex framework"
|
description = "ntexwork utils for ntex framework"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
|
|
@ -197,7 +197,7 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
|
||||||
Poll::Ready(Ok(sock)) => {
|
Poll::Ready(Ok(sock)) => {
|
||||||
let req = this.req.take().unwrap();
|
let req = this.req.take().unwrap();
|
||||||
log::trace!(
|
log::trace!(
|
||||||
"{}: TCP connector - successfully connected to connecting to {:?} - {:?}",
|
"{}: TCP connector - successfully connected to {:?} - {:?}",
|
||||||
this.tag,
|
this.tag,
|
||||||
req.host(),
|
req.host(),
|
||||||
sock.query::<types::PeerAddr>().get()
|
sock.query::<types::PeerAddr>().get()
|
||||||
|
|
|
@ -75,12 +75,12 @@ impl ConnectOps {
|
||||||
|
|
||||||
impl Handler for ConnectOpsBatcher {
|
impl Handler for ConnectOpsBatcher {
|
||||||
fn readable(&mut self, id: usize) {
|
fn readable(&mut self, id: usize) {
|
||||||
log::debug!("ConnectFD is readable {:?}", id);
|
log::debug!("connect-fd is readable {:?}", id);
|
||||||
self.feed.push_back((id, Change::Readable));
|
self.feed.push_back((id, Change::Readable));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn writable(&mut self, id: usize) {
|
fn writable(&mut self, id: usize) {
|
||||||
log::debug!("ConnectFD is writable {:?}", id);
|
log::debug!("connect-fd is writable {:?}", id);
|
||||||
self.feed.push_back((id, Change::Writable));
|
self.feed.push_back((id, Change::Writable));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,9 +20,11 @@ struct ConnectOpsHandler {
|
||||||
inner: Rc<ConnectOpsInner>,
|
inner: Rc<ConnectOpsInner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Operations = RefCell<Slab<(Box<SockAddr>, Sender<io::Result<()>>)>>;
|
||||||
|
|
||||||
struct ConnectOpsInner {
|
struct ConnectOpsInner {
|
||||||
api: DriverApi,
|
api: DriverApi,
|
||||||
ops: RefCell<Slab<Sender<io::Result<()>>>>,
|
ops: Operations,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConnectOps {
|
impl ConnectOps {
|
||||||
|
@ -47,10 +49,17 @@ impl ConnectOps {
|
||||||
addr: SockAddr,
|
addr: SockAddr,
|
||||||
sender: Sender<io::Result<()>>,
|
sender: Sender<io::Result<()>>,
|
||||||
) -> io::Result<()> {
|
) -> io::Result<()> {
|
||||||
let id = self.0.ops.borrow_mut().insert(sender);
|
let addr2 = addr.clone();
|
||||||
|
let mut ops = self.0.ops.borrow_mut();
|
||||||
|
|
||||||
|
// addr must be stable, neon submits ops at the end of rt turn
|
||||||
|
let addr = Box::new(addr);
|
||||||
|
let (addr_ptr, addr_len) = (addr.as_ref().as_ptr(), addr.len());
|
||||||
|
|
||||||
|
let id = ops.insert((addr, sender));
|
||||||
self.0.api.submit(
|
self.0.api.submit(
|
||||||
id as u32,
|
id as u32,
|
||||||
opcode::Connect::new(Fd(fd), addr.as_ptr(), addr.len()).build(),
|
opcode::Connect::new(Fd(fd), addr_ptr, addr_len).build(),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -59,15 +68,20 @@ impl ConnectOps {
|
||||||
|
|
||||||
impl Handler for ConnectOpsHandler {
|
impl Handler for ConnectOpsHandler {
|
||||||
fn canceled(&mut self, user_data: usize) {
|
fn canceled(&mut self, user_data: usize) {
|
||||||
log::debug!("Op is canceled {:?}", user_data);
|
log::debug!("connect-op is canceled {:?}", user_data);
|
||||||
|
|
||||||
self.inner.ops.borrow_mut().remove(user_data);
|
self.inner.ops.borrow_mut().remove(user_data);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn completed(&mut self, user_data: usize, flags: u32, result: io::Result<i32>) {
|
fn completed(&mut self, user_data: usize, flags: u32, result: io::Result<i32>) {
|
||||||
log::debug!("Op is completed {:?} result: {:?}", user_data, result);
|
let (addr, tx) = self.inner.ops.borrow_mut().remove(user_data);
|
||||||
|
log::debug!(
|
||||||
|
"connect-op is completed {:?} result: {:?}, addr: {:?}",
|
||||||
|
user_data,
|
||||||
|
result,
|
||||||
|
addr.as_socket()
|
||||||
|
);
|
||||||
|
|
||||||
let tx = self.inner.ops.borrow_mut().remove(user_data);
|
|
||||||
let _ = tx.send(result.map(|_| ()));
|
let _ = tx.send(result.map(|_| ()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::{cell::RefCell, fmt, io, mem, num::NonZeroU32, os, rc::Rc, task::Poll};
|
use std::{cell::RefCell, io, mem, num::NonZeroU32, os, rc::Rc, task::Poll};
|
||||||
|
|
||||||
use io_uring::{opcode, squeue::Entry, types::Fd};
|
use io_uring::{opcode, squeue::Entry, types::Fd};
|
||||||
use ntex_neon::{driver::DriverApi, driver::Handler, Runtime};
|
use ntex_neon::{driver::DriverApi, driver::Handler, Runtime};
|
||||||
|
@ -392,20 +392,3 @@ impl<T> Drop for StreamCtl<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> PartialEq for StreamCtl<T> {
|
|
||||||
#[inline]
|
|
||||||
fn eq(&self, other: &StreamCtl<T>) -> bool {
|
|
||||||
self.id == other.id && std::ptr::eq(&self.inner, &other.inner)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: fmt::Debug> fmt::Debug for StreamCtl<T> {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
let storage = self.inner.storage.borrow();
|
|
||||||
f.debug_struct("StreamCtl")
|
|
||||||
.field("id", &self.id)
|
|
||||||
.field("io", &storage.streams[self.id].io)
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -60,6 +60,9 @@ ws = ["dep:sha-1"]
|
||||||
# brotli2 support
|
# brotli2 support
|
||||||
brotli = ["dep:brotli2"]
|
brotli = ["dep:brotli2"]
|
||||||
|
|
||||||
|
# disable [ntex::test] logging configuration
|
||||||
|
no-test-logging = []
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
ntex-codec = "0.6"
|
ntex-codec = "0.6"
|
||||||
ntex-http = "0.1.13"
|
ntex-http = "0.1.13"
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue